home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / client / sysint / sys-getattr.sm < prev    next >
Text File  |  2010-07-23  |  47KB  |  1,414 lines

  1. /* 
  2.  * (C) 2003 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /** \file
  8.  *  \ingroup sysint
  9.  *
  10.  *  PVFS2 system interface routines for obtaining attributes of an object
  11.  *  (file or directory).
  12.  */
  13. #include <string.h>
  14. #include <assert.h>
  15.  
  16. #include "client-state-machine.h"
  17. #include "pvfs2-debug.h"
  18. #include "job.h"
  19. #include "gossip.h"
  20. #include "str-utils.h"
  21. #include "pint-util.h"
  22. #include "pvfs2-util.h"
  23. #include "pint-cached-config.h"
  24. #include "PINT-reqproto-encode.h"
  25. #include "pvfs2-internal.h"
  26. #include "pvfs2-types-debug.h"
  27.  
  28. /* pvfs2_client_getattr_sm
  29.  *
  30.  * The sm_p->msgpair structure is used to get the attributes of the
  31.  * object itself.  We convert the original attribute mask (in
  32.  * sm_p->u.getattr.attrmask) to ask for datafile and distribution info
  33.  * if the user asked for file size (PVFS_ATTR_SYS_SIZE).  This allows
  34.  * us to obtain this information (if the object turns out to be a
  35.  * metafile) so that we can later look up the datafile sizes and
  36.  * calculate the overall file size.
  37.  *
  38.  * The sm_p->msgpairarray is used to get datafile sizes, if it turns
  39.  * out that we need them.  This space will also need to be freed, if
  40.  * we grab these sizes.
  41.  */
  42.  
  43. static struct profiler getattr_prof __attribute__((unused));
  44.  
  45. extern job_context_id pint_client_sm_context;
  46.  
  47. enum
  48. {
  49.     GETATTR_ACACHE_MISS = 1,
  50.     GETATTR_NEED_DATAFILE_SIZES = 2,
  51.     GETATTR_IO_RETRY = 3
  52. };
  53.  
  54. /* completion function prototypes */
  55. static int getattr_object_getattr_comp_fn(
  56.     void *v_p, struct PVFS_server_resp *resp_p, int index);
  57. static int getattr_datafile_getattr_comp_fn(
  58.     void *v_p, struct PVFS_server_resp *resp_p, int index);
  59.  
  60. %%
  61.  
  62. nested machine pvfs2_client_datafile_getattr_sizes_sm
  63. {
  64.     state datafile_getattr_setup_msgpairarray
  65.     {
  66.         run getattr_datafile_getattr_setup_msgpairarray;
  67.         success => datafile_getattr_xfer_msgpairarray;
  68.         default => datafile_getattr_cleanup;
  69.     }
  70.     
  71.     state datafile_getattr_xfer_msgpairarray
  72.     {
  73.         jump pvfs2_msgpairarray_sm;
  74.         default => datafile_getattr_retry;
  75.     }
  76.  
  77.     state datafile_getattr_retry
  78.     {
  79.         run getattr_datafile_getattr_retry;
  80.         GETATTR_IO_RETRY => datafile_getattr_xfer_msgpairarray;
  81.         default => datafile_getattr_cleanup;
  82.     }
  83.  
  84.     state datafile_getattr_cleanup
  85.     {
  86.     run getattr_datafile_getattr_cleanup;
  87.         default => return;
  88.     }
  89. }
  90.  
  91.  
  92. nested machine pvfs2_client_getattr_sm
  93. {
  94.     state acache_lookup
  95.     {
  96.         run getattr_acache_lookup;
  97.         GETATTR_ACACHE_MISS => object_getattr_setup_msgpair;
  98.         GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
  99.         default => cleanup;
  100.     }
  101.  
  102.     state object_getattr_setup_msgpair
  103.     {
  104.         run getattr_object_getattr_setup_msgpair;
  105.         success => object_getattr_xfer_msgpair;
  106.         default => cleanup;
  107.     }
  108.  
  109.     state object_getattr_xfer_msgpair
  110.     {
  111.         jump pvfs2_msgpairarray_sm;
  112.         success => acache_insert;
  113.         GETATTR_NEED_DATAFILE_SIZES => datafile_get_sizes;
  114.         default => object_getattr_failure;
  115.     }
  116.  
  117.     state acache_insert
  118.     {
  119.         run getattr_acache_insert;
  120.         default => cleanup;
  121.     }
  122.  
  123.     state object_getattr_failure
  124.     {
  125.         run getattr_object_getattr_failure;
  126.         default => cleanup;
  127.     }
  128.  
  129.     state datafile_get_sizes
  130.     {
  131.     jump pvfs2_client_datafile_getattr_sizes_sm;
  132.     success => acache_insert;
  133.         default => cleanup;
  134.     }
  135.  
  136.     state cleanup
  137.     {
  138.         run getattr_cleanup;
  139.         default => return;
  140.     }
  141. }
  142.  
  143. machine pvfs2_client_sysint_getattr_sm
  144. {
  145.     state dowork
  146.     {
  147.         jump pvfs2_client_getattr_sm;
  148.         default => set_sys_response; 
  149.     }
  150.  
  151.     state set_sys_response
  152.     {
  153.         run getattr_set_sys_response;
  154.         default => terminate;
  155.     }
  156. }
  157.     
  158. %%
  159.  
  160.  
  161. /** Initiate retrieval of object attributes.
  162.  */
  163. PVFS_error PVFS_isys_getattr(
  164.     PVFS_object_ref ref,
  165.     uint32_t attrmask, 
  166.     const PVFS_credentials *credentials,
  167.     PVFS_sysresp_getattr *resp_p,
  168.     PVFS_sys_op_id *op_id,
  169.     PVFS_hint hints,
  170.     void *user_ptr)
  171. {
  172.     PVFS_error ret = -PVFS_EINVAL;
  173.     PINT_smcb *smcb = NULL;
  174.     PINT_client_sm *sm_p = NULL;
  175.  
  176.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_isys_getattr entered\n");
  177.  
  178.     if ((ref.handle == PVFS_HANDLE_NULL) ||
  179.         (ref.fs_id == PVFS_FS_ID_NULL) || (resp_p == NULL))
  180.     {
  181.         gossip_err("invalid (NULL) required argument\n");
  182.         return ret;
  183.     }
  184.     
  185.     if (attrmask & ~(PVFS_ATTR_SYS_ALL))
  186.     {
  187.         gossip_err("invalid attrmask\n");
  188.         return ret;
  189.     }
  190.  
  191.     PINT_smcb_alloc(&smcb, PVFS_SYS_GETATTR, 
  192.             sizeof(struct PINT_client_sm),
  193.             client_op_state_get_machine,
  194.             client_state_machine_terminate,
  195.             pint_client_sm_context);
  196.     if (smcb == NULL)
  197.     {
  198.         return -PVFS_ENOMEM;
  199.     }
  200.     sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  201.  
  202.     PINT_init_msgarray_params(sm_p, ref.fs_id);
  203.     PINT_init_sysint_credentials(sm_p->cred_p, credentials);
  204.     sm_p->error_code = 0;
  205.     sm_p->object_ref = ref;
  206.     sm_p->u.getattr.getattr_resp_p = resp_p;
  207.     PVFS_hint_copy(hints, &sm_p->hints);
  208.     PVFS_hint_add(&sm_p->hints, PVFS_HINT_HANDLE_NAME, sizeof(PVFS_handle)
  209.                  ,&ref.handle);
  210.     
  211.     PINT_SM_GETATTR_STATE_FILL(
  212.         sm_p->getattr,
  213.         ref,
  214.         PVFS_util_sys_to_object_attr_mask(
  215.             attrmask),
  216.         PVFS_TYPE_NONE,
  217.         0);
  218.  
  219.     return PINT_client_state_machine_post(
  220.         smcb,  op_id, user_ptr);
  221. }
  222.  
  223. /** Retrieve object attributes.
  224.  */
  225. PVFS_error PVFS_sys_getattr(
  226.     PVFS_object_ref ref,
  227.     uint32_t attrmask, 
  228.     const PVFS_credentials *credentials,
  229.     PVFS_sysresp_getattr *resp_p,
  230.     PVFS_hint hints)
  231. {
  232.     PVFS_error ret, error;
  233.     PVFS_sys_op_id op_id;
  234.  
  235.     gossip_debug(GOSSIP_CLIENT_DEBUG, "PVFS_sys_getattr entered\n");
  236.  
  237.     ret = PVFS_isys_getattr(ref, attrmask, credentials,
  238.                             resp_p, &op_id, hints, NULL);
  239.     if (ret)
  240.     {
  241.         PVFS_perror_gossip("PVFS_isys_getattr call", ret);
  242.         return ret;
  243.     }
  244.  
  245.     if(op_id != -1)
  246.     {
  247.         /* did not complete immediately, so we wait */
  248.  
  249.         ret = PVFS_sys_wait(op_id, "getattr", &error);
  250.         if (ret)
  251.         {
  252.             PVFS_perror_gossip("PVFS_sys_wait call", ret);
  253.         }
  254.         if(error)
  255.         {
  256.             ret = error;
  257.         }
  258.         PINT_sys_release(op_id);
  259.     }
  260.  
  261.     return ret;
  262. }
  263.  
  264.  
  265. /**
  266.  * getattr_acache_lookup
  267.  * @ingroup client_sm_getattr
  268.  * 
  269.  * This function is invoked as the first state action of the
  270.  * getattr-dowork state machine.  It performs a lookup into the
  271.  * attribute cache for the attribute in question, and returns
  272.  * result codes for a hit or a miss.  
  273.  * 
  274.  * @param smcb This must be a valid client state machine handle, with
  275.  * the @ref getattr field containing valid values for the
  276.  * fsid/handle of the desired attribute (in object_ref), as well as the 
  277.  * requested attribute mask (req_attrmask).
  278.  *
  279.  * @param js_p Contains the return code to be set by this function in the
  280.  * @ref error_code field.  This determines the next state action to jump
  281.  * to in the getattr-dowork state machine.  Possible values for js_p->error_code
  282.  * are:
  283.  * <ul>
  284.  * <li><b>GETATTR_ACACHE_MISS</b> - The requested attribute was not found
  285.  * in the attribute cache, or the attribute was found, but the attribute's
  286.  * mask did not include values required by the requested mask (req_attrmask).
  287.  * </li>
  288.  * <li><b>GETATTR_NEED_DATAFILE_SIZES</b> - The requested attribute was found
  289.  * in the attribute cache and the mask was sufficient, but the data file size
  290.  * was requested for this handle, so we need to get that next.
  291.  * </li>
  292.  * <li><b>default</b> - The requested attribute was found in the attribute
  293.  * cache and its mask values were sufficient.
  294.  * </li>
  295.  *
  296.  * @return This function should always return 1 unless an error occurred
  297.  * within the internals of the state machine.
  298.  */
  299. static PINT_sm_action getattr_acache_lookup(
  300.         struct PINT_smcb *smcb, job_status_s *js_p)
  301. {
  302.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  303.     uint32_t trimmed_mask = 0;
  304.     int missing_attrs;
  305.     PVFS_object_ref object_ref;
  306.     int ret = -1;
  307.     int attr_status = -1;
  308.     int size_status = -1;
  309.  
  310.     js_p->error_code = 0;
  311.  
  312.     object_ref = sm_p->getattr.object_ref;
  313.  
  314.     assert(object_ref.handle != PVFS_HANDLE_NULL);
  315.     assert(object_ref.fs_id != PVFS_FS_ID_NULL);
  316.  
  317.     gossip_debug(GOSSIP_ACACHE_DEBUG, "%s: handle %llu fsid %d\n",
  318.       __func__, llu(object_ref.handle), object_ref.fs_id);
  319.  
  320.    
  321.     /* The sys attr mask request is converted to object
  322.      * attr mask values for comparison with the cached
  323.      */
  324.     if(sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
  325.     {
  326.     sm_p->getattr.req_attrmask |= PVFS_ATTR_META_ALL;
  327.     }
  328.  
  329.     if(sm_p->getattr.flags & PINT_SM_GETATTR_BYPASS_CACHE)
  330.     {
  331.         gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: forced acache miss: "
  332.                     " [%llu]\n",
  333.                       llu(object_ref.handle));
  334.         js_p->error_code = GETATTR_ACACHE_MISS;
  335.         return SM_ACTION_COMPLETE;
  336.     }
  337.  
  338.     ret = PINT_acache_get_cached_entry(object_ref,
  339.         &sm_p->getattr.attr,
  340.         &attr_status,
  341.         &sm_p->getattr.size,
  342.         &size_status);
  343.     if(ret < 0 || attr_status < 0)
  344.     {
  345.         gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: clean acache miss: "
  346.                     " [%llu]\n",
  347.                       llu(object_ref.handle));
  348.   
  349.         js_p->error_code = GETATTR_ACACHE_MISS;
  350.         return SM_ACTION_COMPLETE;
  351.     }
  352.  
  353.     /* acache hit, check results */
  354.   
  355.     /* The sys attr mask request is converted to object
  356.      * attr mask values for comparison with the cached
  357.      * entry
  358.      */
  359.     trimmed_mask = sm_p->getattr.req_attrmask;
  360.  
  361.     gossip_debug(GOSSIP_GETATTR_DEBUG,"request attrmask:\n");
  362.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.req_attrmask);
  363.  
  364.     gossip_debug(GOSSIP_GETATTR_DEBUG,"trimmed attrmask:\n");
  365.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,trimmed_mask);
  366.  
  367.     gossip_debug(GOSSIP_GETATTR_DEBUG,"returned attrmask:\n");
  368.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,sm_p->getattr.attr.mask);
  369.  
  370.     /* the trimmed mask is used for making sure that we're only
  371.      * checking attr bits that make sense for the object type
  372.      * since the caller may have requested all attributes in
  373.      * the case where it doesn't know what type of object we're
  374.      * doing the getattr against.
  375.      */
  376.     if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE)
  377.     {
  378.         trimmed_mask &= (PVFS_ATTR_META_ALL |
  379.                          PVFS_ATTR_META_UNSTUFFED |
  380.                          PVFS_ATTR_DATA_SIZE |
  381.                          PVFS_ATTR_COMMON_ALL);
  382.     }
  383.     else if (sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK)
  384.     {
  385.         trimmed_mask &= (PVFS_ATTR_SYMLNK_ALL | PVFS_ATTR_COMMON_ALL);
  386.     }
  387.     else if (sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY)
  388.     {
  389.         trimmed_mask &= (PVFS_ATTR_COMMON_ALL | PVFS_ATTR_DIR_ALL);
  390.     }
  391.   
  392.     /* trimmed_mask contains the list of attributes
  393.      * requested for a particular object, 
  394.      * while sm_p->getattr.attr.mask contains
  395.      * the list of attributes cached for that object.
  396.      * The cached attributes can be used if requested
  397.      * is less than cached, i.e. all the attributes
  398.      * we need are already cached.  So we need to do
  399.      * a bitwise comparison of requested <= cached.
  400.      *
  401.      * xor of the two masks gives us the bits that are different,
  402.      * and-ing that result with the requested mask gives us the
  403.      * bits in the requested mask but not in the cached mask.
  404.      */
  405.     missing_attrs = ((trimmed_mask ^ sm_p->getattr.attr.mask) &
  406.                      trimmed_mask);
  407.  
  408.     gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask BEFORE:\n");
  409.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
  410.  
  411.     if ( missing_attrs & PVFS_ATTR_META_MIRROR_DFILES )
  412.     {
  413.        /*Mirroring is optional, so remove mirror-dfiles*/
  414.          missing_attrs &= ~PVFS_ATTR_META_MIRROR_DFILES;   
  415.     }
  416.          
  417.     gossip_debug(GOSSIP_GETATTR_DEBUG,"missing attrmask AFTER:\n");
  418.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,missing_attrs);
  419.  
  420.     if((missing_attrs == PVFS_ATTR_DATA_SIZE && size_status == 0) ||
  421.         (missing_attrs == 0))
  422.     {
  423.         /* nothing's missing, a hit! */
  424.         gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit "
  425.                      "[%llu]\n", llu(object_ref.handle));
  426.         js_p->error_code = 0;
  427.         return SM_ACTION_COMPLETE;
  428.     }
  429.   
  430.     /* check if the only thing missing is the file size, then
  431.      * we don't need to do the object getattr operation, we only
  432.      * need to do the datafile getattr operation, so we return
  433.      * the GETATTR_NEED_DATAFILE_SIZES error code which will make
  434.      * the getattr-dowork state machine jump to the datafile getattr
  435.      * operation state. 
  436.      */
  437.     if(missing_attrs == PVFS_ATTR_DATA_SIZE)
  438.     {
  439.         if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
  440.         {
  441.             /* We are missing the size, and we don't know for sure if the
  442.              * file has been unstuffed.  In this case, act as though we
  443.              * missed on all atributes so that we can get fresh stuffed size
  444.              * or datafile information as needed.
  445.              */
  446.         }
  447.         else
  448.         {
  449.             /* if the file size is requested but the distribution info
  450.              * isn't and it hasn't been cached, then we need to
  451.              * get that first.
  452.              */
  453.  
  454.             js_p->error_code = GETATTR_NEED_DATAFILE_SIZES;
  455.             gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache hit, need sizes"
  456.                          "[%llu]\n", llu(object_ref.handle));
  457.             return SM_ACTION_COMPLETE;
  458.         }
  459.     }
  460.  
  461.     /* we missed */
  462.     /* clean out the attributes we got from the cache; this will be
  463.      * overwritten when we request updated information from the server
  464.      */
  465.     PINT_free_object_attr(&sm_p->getattr.attr);
  466.     gossip_debug(GOSSIP_ACACHE_DEBUG, "acache: acache miss due to mask: "
  467.         " [%llu]\n",
  468.           llu(object_ref.handle));
  469.  
  470.     js_p->error_code = GETATTR_ACACHE_MISS;
  471.     return SM_ACTION_COMPLETE;
  472. }
  473.  
  474.  
  475. static PINT_sm_action getattr_object_getattr_setup_msgpair(
  476.         struct PINT_smcb *smcb, job_status_s *js_p)
  477. {
  478.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  479.     int ret = -PVFS_EINVAL;
  480.     PVFS_object_ref object_ref;
  481.     PINT_sm_msgpair_state *msg_p;
  482.  
  483.     gossip_debug(GOSSIP_CLIENT_DEBUG, "(%p) %s\n", sm_p, __func__);
  484.  
  485.     js_p->error_code = 0;
  486.  
  487.     PINT_msgpair_init(&sm_p->msgarray_op);
  488.     msg_p = &sm_p->msgarray_op.msgpair;
  489.  
  490.     object_ref = sm_p->getattr.object_ref;
  491.  
  492.     assert(object_ref.fs_id != PVFS_FS_ID_NULL);
  493.     assert(object_ref.handle != PVFS_HANDLE_NULL);
  494.  
  495.     /* setup the msgpair to do a getattr operation */
  496.     PINT_SERVREQ_GETATTR_FILL(
  497.         msg_p->req,
  498.         *sm_p->cred_p,
  499.         object_ref.fs_id,
  500.         object_ref.handle,
  501.         sm_p->getattr.req_attrmask,
  502.         sm_p->hints);
  503.     
  504.     msg_p->fs_id = object_ref.fs_id;
  505.     msg_p->handle = object_ref.handle;
  506.     msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  507.     msg_p->comp_fn = getattr_object_getattr_comp_fn;
  508.  
  509.     ret = PINT_cached_config_map_to_server(
  510.         &msg_p->svr_addr, msg_p->handle,
  511.         msg_p->fs_id);
  512.     if (ret)
  513.     {
  514.         gossip_err("Failed to map meta server address\n");
  515.         js_p->error_code = ret;
  516.     }
  517.  
  518.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  519.     return SM_ACTION_COMPLETE;
  520. }
  521.  
  522. /*
  523.   copies data from getattr response into the user supplied sys_attr
  524.   structure.  returns 0 for directories and symlinks, and
  525.   GETATTR_NEED_DATAFILE_SIZES for a metafile (when appropriate)
  526. */
  527. static int getattr_object_getattr_comp_fn(
  528.     void *v_p,
  529.     struct PVFS_server_resp *resp_p,
  530.     int index)
  531. {
  532.     PVFS_object_attr *attr = NULL;
  533.     PINT_smcb *smcb = v_p;
  534.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  535.  
  536.     assert(resp_p->op == PVFS_SERV_GETATTR);
  537.  
  538.     gossip_debug(GOSSIP_GETATTR_DEBUG,
  539.                  "getattr_object_getattr_comp_fn called\n");
  540.  
  541.     if (resp_p->status != 0)
  542.     {
  543.         return resp_p->status;
  544.     }
  545.  
  546.     /*
  547.      * If we've reached the callback for the getattr msgpair tranfer,
  548.      * then we can make a copy of the retrieved attribute for later
  549.      * caching.
  550.      */
  551.     PINT_copy_object_attr(&sm_p->getattr.attr,
  552.                           &resp_p->u.getattr.attr);
  553.  
  554.     attr =  &sm_p->getattr.attr;
  555.  
  556.     /* if the ref_type mask is set to a non-zero value (!PVFS_TYPE_NONE)
  557.      * a -PVFS_error will be triggered if the
  558.      * attributes received are not one of the the types specified.
  559.      * This is useful so that the client can know (in some cases) that it
  560.      * can avoid issuing an operation to the server since the server will 
  561.      * just pass an error back anyway.
  562.      */
  563.     if(sm_p->getattr.ref_type &&
  564.        sm_p->getattr.ref_type != attr->objtype)
  565.     {
  566.         int ret;
  567.         gossip_debug(GOSSIP_CLIENT_DEBUG, "*** "
  568.                      "getattr_comp_fn: Object type mismatch.\n Possibly "
  569.                      "saving network roundtrip by returning an error\n");
  570.  
  571.         if (sm_p->getattr.ref_type == PVFS_TYPE_DIRECTORY)
  572.         {
  573.             ret = -PVFS_ENOTDIR;
  574.         }
  575.         else
  576.         {
  577.             assert(sm_p->getattr.ref_type == PVFS_TYPE_METAFILE);
  578.             ret = ((attr->objtype == PVFS_TYPE_DIRECTORY) ?
  579.                    -PVFS_EISDIR : -PVFS_EBADF);
  580.         }
  581.         PVFS_perror_gossip("Object Type mismatch error", ret);
  582.         return ret;
  583.     }
  584.  
  585.     /* do assertion checking of getattr response values, and
  586.      * check if file sizes are needed.  With NDEBUG defined, this block
  587.      * only checks if file sizes are needed.
  588.      */
  589.     switch (attr->objtype)
  590.     {
  591.         case PVFS_TYPE_METAFILE:
  592.             if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
  593.                 PVFS_ATTR_META_DIST)
  594.             {
  595.                 /* if we requested distribution attrs, did the distribution 
  596.                  * get set and is the size valid?
  597.                  */
  598.                 assert(attr->mask & PVFS_ATTR_META_DIST);
  599.                 assert(attr->u.meta.dist && (attr->u.meta.dist_size > 0));
  600.             }
  601.             if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
  602.                 PVFS_ATTR_META_MIRROR_DFILES)
  603.             {
  604.                 if (attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
  605.                 {    assert(attr->u.meta.mirror_dfile_array &&
  606.                           (attr->u.meta.mirror_copies_count > 0));
  607.                      gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: Mirror handles and "
  608.                                                        "copy count retrieved.\n"
  609.                                                      ,__func__);
  610.                 }
  611.                 else
  612.                 {
  613.                    gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: request attribute "
  614.                    "mask says to get the mirror dfiles, if they exist, \nbut "
  615.                    "the response attribute mask says they were not retrieved. "
  616.                    "This is okay.\n"
  617.                    ,__func__);
  618.                 }
  619.             }
  620.             if (sm_p->msgarray_op.msgpair.req.u.getattr.attrmask &
  621.                 PVFS_ATTR_META_DFILES)
  622.             {
  623.                 /* if we requested the datafile handles for the file, did
  624.                  * the datafile array get populated?
  625.                  */
  626.                 assert(attr->u.meta.dfile_array &&
  627.                        (attr->u.meta.dfile_count > 0));
  628.  
  629.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  630.                              "getattr_object_getattr_comp_fn: "
  631.                              "%d datafiles.\n", attr->u.meta.dfile_count);
  632.                 
  633.                 /* if we need the size, that should be the only time we're 
  634.                  * going to have to do a full data file fetch. 
  635.                  * (that's expensive)
  636.                  */
  637.                 if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
  638.                 {
  639.                     /* is the file stuffed? */
  640.                     if(!(attr->mask & PVFS_ATTR_META_UNSTUFFED))
  641.                     {
  642.                         /* we can compute the size without doing any more
  643.                          * getattr requests
  644.                          */
  645.                         gossip_debug(GOSSIP_GETATTR_DEBUG,
  646.                             "getattr_object_getattr_comp_fn: "
  647.                             "detected stuffed file.\n");
  648.                         return(0);
  649.                     }
  650.                     /* if caller asked for the size, then we need
  651.                      * to jump to the datafile_getattr state, which
  652.                      * will retrieve the datafile sizes for us.
  653.                      */
  654.                     return GETATTR_NEED_DATAFILE_SIZES;
  655.                 }
  656.             }
  657.             return 0;
  658.         case PVFS_TYPE_DIRECTORY:
  659.         {
  660.             gossip_debug(GOSSIP_CLIENT_DEBUG,
  661.                 "getattr comp_fn [%p] "
  662.                 "dfile_count = %d "
  663.                 "dist_name_len = %d "
  664.                 "dist_params_len = %d\n",
  665.                 attr,
  666.                 attr->u.dir.hint.dfile_count,
  667.                 attr->u.dir.hint.dist_name_len,
  668.                 attr->u.dir.hint.dist_params_len);
  669.             return 0;
  670.         }
  671.         case PVFS_TYPE_SYMLINK:
  672.             return 0;
  673.         case PVFS_TYPE_DATAFILE:
  674.         return 0;
  675.         case PVFS_TYPE_DIRDATA:
  676.         return 0;
  677.         case PVFS_TYPE_INTERNAL:
  678.         return 0;
  679.         default:
  680.             gossip_err("error: getattr_object_getattr_comp_fn: "
  681.                        "handle refers to invalid object type\n");
  682.     }
  683.     return -PVFS_EINVAL;
  684. }
  685.  
  686.  
  687. static PINT_sm_action getattr_object_getattr_failure(
  688.         struct PINT_smcb *smcb, job_status_s *js_p)
  689. {
  690.     struct PINT_client_sm *sm_p __attribute__((unused)) = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  691.  
  692.     gossip_debug(GOSSIP_CLIENT_DEBUG
  693.                 ,"(%p) getattr state: getattr_object_getattr_failure\n"
  694.                 ,sm_p);
  695.  
  696.     if ((js_p->error_code != -PVFS_ENOENT) &&
  697.         (js_p->error_code != -PVFS_EINVAL))
  698.     {
  699.         PVFS_perror_gossip("getattr_object_getattr_failure ",
  700.                            js_p->error_code);
  701.     }
  702.  
  703.     return SM_ACTION_COMPLETE;
  704. }
  705.  
  706. /* NOTE:  This nested state machine allocates and stores the results in getattr.size_array.  So,
  707.  * if you call this state machine directly, do not allocate space prior to calling it to avoid a
  708.  * nasty memory leak.
  709. */
  710. static PINT_sm_action getattr_datafile_getattr_setup_msgpairarray(
  711.         struct PINT_smcb *smcb, job_status_s *js_p)
  712. {
  713.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  714.     PINT_sm_getattr_state *getattr = &(sm_p->getattr);
  715.     int ret = -PVFS_EINVAL;
  716.     int i = 0;
  717.     PVFS_object_attr *attr = NULL;
  718.     PINT_sm_msgpair_state *msg_p = NULL;
  719.     uint64_t mirror_retry = (sm_p->getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES);
  720.  
  721.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing getattr_datafile_getattr_setup_msgpairarray...\n");
  722.     gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: Are we mirroring? %s\n",__func__
  723.                                     ,(mirror_retry ? "YES" : "NO")); 
  724.     gossip_debug(GOSSIP_MIRROR_DEBUG,"%s: attr.mask:0x%08x \tmirror_retry:0x%08x\n"
  725.                 ,__func__
  726.                 ,sm_p->getattr.attr.mask
  727.                 ,(unsigned int)mirror_retry);
  728.  
  729.     js_p->error_code = 0;
  730.  
  731.     attr = &sm_p->getattr.attr;
  732.     assert(attr);
  733.  
  734.     PINT_msgpair_init(&sm_p->msgarray_op);
  735.     msg_p = &sm_p->msgarray_op.msgpair;
  736.  
  737.     /*initialize the size_array, which will hold the size of each datahandle*/
  738.     PINT_SM_DATAFILE_SIZE_ARRAY_INIT(&getattr->size_array,attr->u.meta.dfile_count);
  739.  
  740.     /* initialize mir_ctx_array: one context for each handle in the file */
  741.     getattr->mir_ctx_array = malloc(attr->u.meta.dfile_count *
  742.                                     sizeof(*getattr->mir_ctx_array));
  743.     if (!getattr->mir_ctx_array)
  744.     {
  745.         gossip_lerr("Unable to allocate mirror context array.\n");
  746.         js_p->error_code = -PVFS_ENOMEM;
  747.         return SM_ACTION_COMPLETE;
  748.     }
  749.     memset(getattr->mir_ctx_array,0,attr->u.meta.dfile_count * 
  750.                                    sizeof(*getattr->mir_ctx_array));
  751.     getattr->mir_ctx_count = attr->u.meta.dfile_count;
  752.  
  753.     for (i=0; i<getattr->mir_ctx_count; i++)
  754.     {
  755.        getattr->mir_ctx_array[i].original_datahandle = attr->u.meta.dfile_array[i];
  756.        getattr->mir_ctx_array[i].original_server_nr = i;
  757.     }
  758.  
  759.     /* allocate handle array and populate */
  760.     PVFS_handle *handles = malloc(sizeof(PVFS_handle) * attr->u.meta.dfile_count);
  761.     if (!handles)
  762.     {
  763.         gossip_lerr("Unable to allocation local handles array.\n");
  764.         js_p->error_code = -PVFS_ENOMEM;
  765.         return SM_ACTION_COMPLETE;
  766.     }
  767.     memset(handles,0,sizeof(PVFS_handle) * attr->u.meta.dfile_count);
  768.     memcpy(handles,attr->u.meta.dfile_array,attr->u.meta.dfile_count *
  769.                                             sizeof(PVFS_handle));
  770.  
  771.     /*allocate index-to-server array and populate*/
  772.     getattr->index_to_server = malloc(sizeof(uint32_t)*attr->u.meta.dfile_count);
  773.     if (!getattr->index_to_server)
  774.     {
  775.         gossip_lerr("Unable to allocate index-to-server array.\n");
  776.         js_p->error_code = -PVFS_ENOMEM;
  777.         return SM_ACTION_COMPLETE;
  778.     }
  779.     memset(getattr->index_to_server,0,sizeof(uint32_t)*attr->u.meta.dfile_count);
  780.     for (i=0; i<attr->u.meta.dfile_count; i++)
  781.     {
  782.         getattr->index_to_server[i] = (uint32_t)i;
  783.     }
  784.  
  785. //START_PROFILER(getattr_prof);
  786.     PINT_SERVREQ_TREE_GET_FILE_SIZE_FILL(
  787.             msg_p->req,
  788.             *sm_p->cred_p,
  789.             sm_p->getattr.object_ref.fs_id,
  790.             0,
  791.             attr->u.meta.dfile_count,
  792.             handles,
  793.             (mirror_retry ? 1 : 0),
  794.             sm_p->hints);
  795.  
  796.     msg_p->fs_id = sm_p->getattr.object_ref.fs_id;
  797.     msg_p->handle = handles[0];
  798.     msg_p->comp_fn = getattr_datafile_getattr_comp_fn;
  799.  
  800.     ret = PINT_cached_config_map_to_server(
  801.         &msg_p->svr_addr, msg_p->handle, msg_p->fs_id);
  802.     if (ret)
  803.     {
  804.         gossip_lerr("Unable to map server address for this handle(%llu) and "
  805.                     "filesystem(%d)\n"
  806.                    ,llu(msg_p->handle)
  807.                    ,msg_p->fs_id);
  808.         js_p->error_code = ret;
  809.         return SM_ACTION_COMPLETE;
  810.     }
  811.  
  812.     /* set retry flag based on mirroring option...if mirroring, we will handle
  813.      * retries from this machine; if not, msgpairarray will handle retries.
  814.     */
  815.     if (mirror_retry)
  816.     {
  817.         msg_p->retry_flag = PVFS_MSGPAIR_NO_RETRY;
  818.     }
  819.     else
  820.     {
  821.         msg_p->retry_flag = PVFS_MSGPAIR_RETRY;
  822.     }
  823.  
  824.  
  825.     PINT_sm_push_frame(smcb, 0, &sm_p->msgarray_op);
  826.     return SM_ACTION_COMPLETE;
  827. }
  828.  
  829. static int getattr_datafile_getattr_comp_fn(
  830.     void *v_p, struct PVFS_server_resp *resp_p, int index)
  831. {
  832.     PINT_smcb *smcb = v_p;
  833.     PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_MSGPAIR_PARENT_SM);
  834.     PINT_sm_msgpair_state *msg = &(sm_p->msgarray_op.msgarray[index]);
  835.     struct PVFS_servreq_tree_get_file_size *tree = 
  836.                                               &(msg->req.u.tree_get_file_size);
  837.  
  838.     PINT_sm_getattr_state *getattr = &(sm_p->getattr);
  839.     PINT_client_getattr_mirror_ctx *ctx = NULL;
  840.     uint32_t server_nr = 0;
  841.     int i = 0;
  842.  
  843.     if (resp_p->status)
  844.     {   /* tree request had a problem */
  845.         return resp_p->status;
  846.     }
  847.  
  848.     assert(resp_p->op == PVFS_SERV_TREE_GET_FILE_SIZE);
  849.  
  850.  
  851.     /* if we are mirroring, then we need to check the error code returned from
  852.      * each server. If an error is found, mirroring will try to get the size 
  853.      * from a different server.  Below, we are marking which handles completed 
  854.      * successfully, which tells mirroring NOT to retry them.
  855.     */
  856.     if ( getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)  
  857.     {
  858.        for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
  859.        {
  860.           if (resp_p->u.tree_get_file_size.error[i] != 0)
  861.           {   /* error retrieving size for this handle..we will retry it. */
  862.              continue;
  863.           }
  864.           server_nr = getattr->index_to_server[i];
  865.           sm_p->getattr.size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
  866.           ctx = &(getattr->mir_ctx_array[server_nr]);
  867.           ctx->msg_completed = 1; 
  868.  
  869.           /*For completed messages, update the size array with the file size
  870.            *just retrieved.
  871.           */
  872.           getattr->size_array[server_nr] = resp_p->u.tree_get_file_size.size[i];
  873.           gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: size[%d]:%lld \thandle:%llu\n"
  874.                                          ,__func__
  875.                                          ,i
  876.                                          ,llu(getattr->size_array[i])
  877.                                          ,llu(tree->handle_array[i]));
  878.        }/*end for*/
  879.     }
  880.     else
  881.     {
  882.      /* if we are NOT mirroring and an error is found for an individual handle, 
  883.       * then we must invalidate the size array and return an error code.
  884.      */
  885.        for (i=0; i<resp_p->u.tree_get_file_size.handle_count; i++)
  886.        {
  887.           gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d]:%d"
  888.                                                "\tsize[%d]:%d\n"
  889.                                            ,__func__
  890.                                            ,i
  891.                                            ,(int)resp_p->u.tree_get_file_size.error[i]
  892.                                            ,i
  893.                                            ,(int)resp_p->u.tree_get_file_size.size[i]);
  894.  
  895.           if (resp_p->u.tree_get_file_size.error[i] != 0)
  896.           {
  897.               gossip_debug(GOSSIP_GETATTR_DEBUG,"%s: error[%d] is %d\n"
  898.                                                ,__func__
  899.                                                ,i
  900.                                                ,resp_p->u.tree_get_file_size.error[i]);
  901.               memset(getattr->size_array,0,sizeof(*getattr->size_array));
  902.               return (resp_p->u.tree_get_file_size.error[i]);
  903.           }
  904.  
  905.           getattr->size_array[i] = resp_p->u.tree_get_file_size.size[i];
  906.        }/*end for*/
  907.  
  908.     }/*end if*/
  909.  
  910.     return(0);
  911. }/*end getattr_datafile_getattr_comp_fn*/
  912.  
  913. static PINT_sm_action getattr_datafile_getattr_retry(
  914.         struct PINT_smcb *smcb, job_status_s *js_p)
  915. {
  916.     struct PINT_client_sm   *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  917.     struct PVFS_object_attr *attr = &(sm_p->getattr.attr);
  918.     PVFS_metafile_attr *meta = &(attr->u.meta);
  919.     PINT_sm_getattr_state *getattr = &(sm_p->getattr);
  920.     PINT_client_getattr_mirror_ctx *ctx = NULL;
  921.     PINT_sm_msgarray_op *mop = &(sm_p->msgarray_op);
  922.     PINT_sm_msgpair_state *msg = &(mop->msgarray[0]);
  923.     char *enc_req_bytes = NULL;
  924.     struct PVFS_servreq_tree_get_file_size *tree = 
  925.                                               &(msg->req.u.tree_get_file_size);
  926.     uint32_t retry_msg_count = 0;
  927.     uint32_t index = 0;
  928.     uint32_t copies = 0;
  929.     uint32_t server_nr = 0;
  930.     int i   = 0;
  931.     int j   = 0;
  932.     int k   = 0;
  933.     int ret = 0;
  934.  
  935.     gossip_debug(GOSSIP_CLIENT_DEBUG, 
  936.                  "(%p) getattr state: "
  937.                  "getattr_datafile_getattr_retry\n", sm_p);
  938.  
  939.     /*We only need to retry if we have mirrors; otherwise, msgpairarray 
  940.      *has already handled the retries.
  941.     */
  942.     if (!(attr->mask & PVFS_ATTR_META_MIRROR_DFILES))  
  943.     {  
  944.        /*we are NOT mirroring.*/
  945.        return SM_ACTION_COMPLETE;
  946.     }
  947.     
  948.     /* How many handles need to be retried? */
  949.     for (i=0; i<tree->num_data_files; i++)
  950.     {
  951.         server_nr = getattr->index_to_server[i];
  952.         ctx = &(getattr->mir_ctx_array[server_nr]);
  953.         if (ctx->msg_completed == 0)
  954.             retry_msg_count++;
  955.     }
  956.  
  957.     /* no retries needed */
  958.     if (retry_msg_count == 0)
  959.     {
  960.         return SM_ACTION_COMPLETE;
  961.     }
  962.  
  963.     /* do we have any retries available? */
  964.     if (getattr->retry_count >= mop->params.retry_limit)
  965.     {
  966.         /* at this point, we have msgpairs that need to be retried, but we
  967.          * we have met our retry limit.  so, we must invalidate the size array,
  968.          * since we don't have all of the necessary sizes AND return an error.
  969.         */
  970.         memset(getattr->size_array,0,sizeof(*getattr->size_array) * getattr->size);
  971.         js_p->error_code = (js_p->error_code ? js_p->error_code : -PVFS_ETIME);
  972.         gossip_err("%s: Ran out of retries(%d)\n",__func__
  973.                                                  ,getattr->retry_count);
  974.         return SM_ACTION_COMPLETE;
  975.     }
  976.  
  977.     /*allocate temporary index-to-server array*/
  978.     uint32_t *tmp_server_nr = malloc(sizeof(uint32_t) * retry_msg_count);
  979.     if (!tmp_server_nr)
  980.     {
  981.         gossip_lerr("Unable to allocate temporary index-to-server array.\n");
  982.         js_p->error_code = -PVFS_ENOMEM;
  983.         return SM_ACTION_COMPLETE;
  984.     }
  985.     memset(tmp_server_nr,0,sizeof(uint32_t) * retry_msg_count);
  986.  
  987.     /*allocate temporary handle array*/
  988.     PVFS_handle *tmp_handles = malloc(sizeof(PVFS_handle) * retry_msg_count);
  989.     if (!tmp_handles)
  990.     {
  991.         gossip_lerr("Unable to allocate temporary handle array.\n");
  992.         js_p->error_code = -PVFS_ENOMEM;
  993.         return SM_ACTION_COMPLETE;
  994.     }
  995.     memset(tmp_handles,0,sizeof(PVFS_handle) * retry_msg_count);
  996.  
  997.     /* okay. let's setup new handles to retry. 
  998.     */
  999.     for (i=0,j=0; i<tree->num_data_files; i++)
  1000.     {
  1001.         server_nr = getattr->index_to_server[i];
  1002.         ctx = &(getattr->mir_ctx_array[server_nr]);
  1003.      
  1004.         /* don't process completed messages */
  1005.         if (ctx->msg_completed)
  1006.            continue;
  1007.  
  1008.         /* for incomplete messages, cleanup memory, if necessary */ 
  1009.         enc_req_bytes = (char *)&(msg->encoded_req);
  1010.         for (k=0; k<sizeof(msg->encoded_req); k++)
  1011.         {
  1012.            if (enc_req_bytes[k] != '\0')
  1013.            {
  1014.               PINT_encode_release(&(msg->encoded_req),PINT_ENCODE_REQ);
  1015.               break;
  1016.            }
  1017.         }/*end for*/
  1018.  
  1019.         if (msg->encoded_resp_p)
  1020.         {
  1021.             BMI_memfree(msg->svr_addr
  1022.                        ,msg->encoded_resp_p
  1023.                        ,msg->max_resp_sz
  1024.                        ,BMI_RECV);
  1025.         }
  1026.         
  1027.         /* Should we use the original datahandle? */
  1028.         if (ctx->retry_original)
  1029.         {
  1030.             ctx->retry_original = 0;
  1031.             tmp_handles[j]   = ctx->original_datahandle;
  1032.             tmp_server_nr[j] = ctx->original_server_nr;
  1033.             j++;
  1034.             continue;
  1035.         }/*end retry_original*/
  1036.  
  1037.         /* otherwise, get next mirrored handle.  note:  if a mirrored handle is
  1038.          * zero, then this means that the creation of this mirrored object 
  1039.          * failed for its particular server.  in this case, get the next valid 
  1040.          * handle.  as a last resort, retry the original handle.
  1041.         */
  1042.         copies = ctx->current_copies_count;
  1043.         for (;copies < meta->mirror_copies_count; copies++)
  1044.         {
  1045.             index = (copies*meta->dfile_count) + server_nr;
  1046.             if (meta->mirror_dfile_array[index] != 0)
  1047.             {  /* we have found a valid mirrored handle */
  1048.                tmp_handles[j]   = meta->mirror_dfile_array[index];
  1049.                tmp_server_nr[j] = server_nr;
  1050.                j++;
  1051.                break;
  1052.             }
  1053.         }
  1054.  
  1055.         /* if we haven't found a valid mirrored handle, retry the original
  1056.          * datahandle.
  1057.         */
  1058.         if ( copies == meta->mirror_copies_count )
  1059.         {
  1060.            tmp_handles[j]   = ctx->original_datahandle;
  1061.            tmp_server_nr[j] = ctx->original_server_nr;
  1062.            j++;
  1063.            ctx->retry_original = 0;
  1064.            ctx->current_copies_count = 0;
  1065.            getattr->retry_count++;
  1066.            continue;
  1067.         }/*end if we have to use the original*/
  1068.  
  1069.         /* otherwise, setup for the next retry event */
  1070.         ctx->current_copies_count++;
  1071.         if (ctx->current_copies_count == meta->mirror_copies_count)
  1072.         {
  1073.            ctx->current_copies_count = 0;
  1074.            ctx->retry_original = 1;
  1075.            getattr->retry_count++;
  1076.         }
  1077.     }/*end for each handle in the old request*/
  1078.  
  1079.     /*replace values in old tree request*/
  1080.     free(tree->handle_array);
  1081.     tree->handle_array = tmp_handles;
  1082.     tree->num_data_files = retry_msg_count;
  1083.     
  1084.     /*replace values in old message request*/
  1085.     msg->handle = tmp_handles[0];
  1086.     msg->svr_addr=0;
  1087.     ret = PINT_cached_config_map_to_server(&msg->svr_addr
  1088.                                           ,msg->handle
  1089.                                           ,msg->fs_id);
  1090.     if (ret)
  1091.     {
  1092.         gossip_lerr("Unable to determine server address for handle(%llu) and "
  1093.                     "file system(%d).\n"
  1094.                    ,llu(msg->handle)
  1095.                    ,msg->fs_id);
  1096.         js_p->error_code = -PVFS_EINVAL;
  1097.         return SM_ACTION_COMPLETE;
  1098.     }
  1099.  
  1100.     /*save index-to-server array*/
  1101.     free(getattr->index_to_server);
  1102.     getattr->index_to_server = tmp_server_nr;
  1103.  
  1104.     /* Push the msgarray_op and jump to msgpairarray.sm */
  1105.     PINT_sm_push_frame(smcb,0,mop);
  1106.     js_p->error_code=GETATTR_IO_RETRY;
  1107.  
  1108.     return SM_ACTION_COMPLETE;
  1109. } /*end datafile_getattr_retry*/
  1110.  
  1111.  
  1112.  
  1113.  
  1114.  
  1115.  
  1116. static PINT_sm_action getattr_datafile_getattr_cleanup(
  1117.         struct PINT_smcb *smcb, job_status_s *js_p)
  1118. {
  1119.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1120. //FINISH_PROFILER("getattr", getattr_prof, 1);
  1121.     PINT_sm_getattr_state *getattr = &(sm_p->getattr);
  1122.  
  1123.     /* cleanup handle_array created for the one tree request */
  1124.     PINT_sm_msgpair_state *msg_p = &(sm_p->msgarray_op.msgarray[0]);
  1125.     if (msg_p->req.u.tree_get_file_size.handle_array)
  1126.        free(msg_p->req.u.tree_get_file_size.handle_array);
  1127.     
  1128.     /* cleanup tree request */
  1129.     PINT_msgpairarray_destroy(&sm_p->msgarray_op);
  1130.  
  1131.     /* cleanup memory that may have been used for mirrored retries.*/
  1132.     if (getattr->mir_ctx_array)
  1133.        free(getattr->mir_ctx_array);
  1134.     if (getattr->index_to_server)
  1135.        free(getattr->index_to_server);
  1136.  
  1137.  
  1138.     return SM_ACTION_COMPLETE;
  1139. }
  1140.  
  1141. static PINT_sm_action getattr_acache_insert(
  1142.         struct PINT_smcb *smcb, job_status_s *js_p)
  1143. {   
  1144.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1145.     PVFS_size* tmp_size = NULL;
  1146.  
  1147.     if( sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE || 
  1148.         sm_p->getattr.attr.objtype == PVFS_TYPE_DIRECTORY ||
  1149.         sm_p->getattr.attr.objtype == PVFS_TYPE_SYMLINK )
  1150.     {
  1151.         /* see if we have a size value to cache */
  1152.         if (sm_p->getattr.attr.objtype == PVFS_TYPE_METAFILE &&
  1153.             sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
  1154.         {                                           
  1155.             if(!(sm_p->getattr.attr.mask & PVFS_ATTR_META_UNSTUFFED))
  1156.             {
  1157.                 /* stuffed file case */
  1158.                 sm_p->getattr.size = sm_p->getattr.attr.u.meta.stuffed_size;
  1159.                 tmp_size = &sm_p->getattr.size;
  1160.                 gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr_acache_insert "
  1161.                   "calculated stuffed logical size of %lld\n", lld(*tmp_size));
  1162.             }
  1163.             else
  1164.             {
  1165.                 /* compute size as requested */
  1166.                 assert(sm_p->getattr.attr.u.meta.dist);
  1167.                 assert(sm_p->getattr.attr.u.meta.dist->methods &&
  1168.                        sm_p->getattr.attr.u.meta.dist->methods->logical_file_size);
  1169.  
  1170.                 sm_p->getattr.size =
  1171.                     (sm_p->getattr.attr.u.meta.dist->methods->logical_file_size)(
  1172.                     sm_p->getattr.attr.u.meta.dist->params,
  1173.                     sm_p->getattr.attr.u.meta.dfile_count,
  1174.                     sm_p->getattr.size_array);
  1175.  
  1176.                 tmp_size = &sm_p->getattr.size;
  1177.                 gossip_debug(GOSSIP_GETATTR_DEBUG,"getattr_acache_insert calculated"
  1178.                                                   " unstuffed logical size of %lld\n"
  1179.                                                  , lld(*tmp_size));
  1180.             }
  1181.         }
  1182.  
  1183.         PINT_acache_update(sm_p->getattr.object_ref,
  1184.             &sm_p->getattr.attr,
  1185.             tmp_size);
  1186.  
  1187.         gossip_debug(GOSSIP_CLIENT_DEBUG, "trying to add object "
  1188.                      "reference to acache\n");
  1189.     }
  1190.  
  1191.     return SM_ACTION_COMPLETE;
  1192. }
  1193.  
  1194. static PINT_sm_action getattr_cleanup(
  1195.         struct PINT_smcb *smcb, job_status_s *js_p)
  1196. {
  1197.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1198.     PINT_sm_getattr_state *getattr = &(sm_p->getattr);
  1199.  
  1200.     gossip_debug(GOSSIP_CLIENT_DEBUG,
  1201.                  "(%p) getattr state: getattr_cleanup\n", sm_p);
  1202.  
  1203.     gossip_debug(GOSSIP_GETATTR_DEBUG
  1204.                 ,"%s: js_p->error_code:%d \tgetattr->attr.mask:0x%08x\n"
  1205.                 ,__func__
  1206.                 ,js_p->error_code
  1207.                 ,getattr->attr.mask);
  1208.  
  1209.     sm_p->error_code = js_p->error_code;
  1210.  
  1211.     /* cleanup size array; is only allocated if datafile sizes are retrieved */
  1212.     if (getattr->size_array)
  1213.        free(getattr->size_array);
  1214.  
  1215.     /* cleanup getattr when an error occurs */
  1216.     if (js_p->error_code)
  1217.     {
  1218.       if (getattr->attr.mask & PVFS_ATTR_META_DFILES)
  1219.       {
  1220.          if (getattr->attr.u.meta.dfile_array)
  1221.             free(getattr->attr.u.meta.dfile_array);
  1222.       }
  1223.  
  1224.       if (getattr->attr.mask & PVFS_ATTR_META_MIRROR_DFILES)
  1225.       {
  1226.          if (getattr->attr.u.meta.mirror_dfile_array)
  1227.             free(getattr->attr.u.meta.mirror_dfile_array);
  1228.       }
  1229.  
  1230.       if (getattr->attr.mask & PVFS_ATTR_META_DIST)
  1231.       {
  1232.          PINT_dist_free(getattr->attr.u.meta.dist);
  1233.       }
  1234.     }/*end if error*/
  1235.  
  1236.     return SM_ACTION_COMPLETE;
  1237. }
  1238.  
  1239. static PINT_sm_action getattr_set_sys_response(
  1240.         struct PINT_smcb *smcb, job_status_s *js_p)
  1241. {
  1242.     struct PINT_client_sm *sm_p = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1243.     PVFS_sysresp_getattr * sysresp = NULL;
  1244.     PVFS_object_attr *attr = NULL;
  1245.  
  1246.     if(js_p->error_code != 0)
  1247.     {
  1248.         PINT_SET_OP_COMPLETE;
  1249.         return SM_ACTION_TERMINATE;
  1250.     }
  1251.  
  1252.     attr = &sm_p->getattr.attr;
  1253.     assert(attr);
  1254.     
  1255.     /* If we get to this state action, 
  1256.      * the getattr state machine was invoked, so
  1257.      * we can assume that one of the PVFS_[i]sys_getattr functions
  1258.      * was called, and the response field must be filled in for the
  1259.      * user.
  1260.      */
  1261.  
  1262.     sysresp = sm_p->u.getattr.getattr_resp_p;
  1263.     
  1264.     /*
  1265.      * if we retrieved a symlink target, copy it for the caller; this
  1266.      * target path will be handed all the way back up to the caller via
  1267.      * the PVFS_sys_attr object.  The caller of PVFS_[i]sys_getattr
  1268.      * must free it.
  1269.      */
  1270.     if(attr->objtype == PVFS_TYPE_SYMLINK &&
  1271.        attr->mask & PVFS_ATTR_SYMLNK_TARGET)
  1272.     {
  1273.         assert(attr->u.sym.target_path_len > 0);
  1274.         assert(attr->u.sym.target_path);
  1275.  
  1276.         sysresp->attr.link_target = strdup(attr->u.sym.target_path);
  1277.         if (!sysresp->attr.link_target)
  1278.         {
  1279.            js_p->error_code = -PVFS_ENOMEM;
  1280.            PINT_SET_OP_COMPLETE;
  1281.            return SM_ACTION_TERMINATE;
  1282.         }
  1283.     }
  1284.  
  1285.     if(attr->objtype == PVFS_TYPE_METAFILE) 
  1286.     {
  1287.        /* Copy if there are any special object specific flags */
  1288.        sysresp->attr.flags = attr->u.meta.hint.flags;
  1289.        /* special case for when users ask for dfile count */
  1290.        if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_DFILES)
  1291.        {
  1292.            sysresp->attr.dfile_count = attr->u.meta.dfile_count;
  1293.        }
  1294.        if (sm_p->getattr.req_attrmask & PVFS_ATTR_META_MIRROR_DFILES)
  1295.        {
  1296.          sysresp->attr.mirror_copies_count = attr->u.meta.mirror_copies_count;
  1297.        }
  1298.     }
  1299.     if (attr->objtype == PVFS_TYPE_DIRECTORY)
  1300.     {
  1301.         gossip_debug(GOSSIP_CLIENT_DEBUG, "dfile_count: %d\n", 
  1302.             attr->u.dir.hint.dfile_count);
  1303.         gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_len = %d, "
  1304.             "dist_params_len = %d\n",
  1305.             attr->u.dir.hint.dist_name_len, attr->u.dir.hint.dist_params_len);
  1306.         sysresp->attr.dfile_count = attr->u.dir.hint.dfile_count;
  1307.         /* 
  1308.          * If we retrieved any extended attributes for the directory
  1309.          * in question, the caller's responsibility to free it up
  1310.          */
  1311.         if (attr->u.dir.hint.dist_name_len > 0 && 
  1312.             (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
  1313.         {
  1314.             sysresp->attr.dist_name = strdup(attr->u.dir.hint.dist_name);
  1315.             if (!sysresp->attr.dist_name)
  1316.             {
  1317.                 js_p->error_code = -PVFS_ENOMEM;
  1318.                 PINT_SET_OP_COMPLETE;
  1319.                 return SM_ACTION_TERMINATE;
  1320.             }
  1321.             gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_hint: %s\n"
  1322.                                             , sysresp->attr.dist_name);
  1323.         }
  1324.         if (attr->u.dir.hint.dist_params_len > 0 &&
  1325.             (sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_HINT))
  1326.         {
  1327.             sysresp->attr.dist_params = strdup(attr->u.dir.hint.dist_params);
  1328.             if (!sysresp->attr.dist_params)
  1329.             {
  1330.                 free(sysresp->attr.dist_name);
  1331.                 sysresp->attr.dist_name = NULL;
  1332.                 js_p->error_code = -PVFS_ENOMEM;
  1333.                 PINT_SET_OP_COMPLETE;
  1334.                 return SM_ACTION_TERMINATE;
  1335.             }
  1336.             gossip_debug(GOSSIP_CLIENT_DEBUG, "dist_name_params: %s\n"
  1337.                                             , sysresp->attr.dist_params);
  1338.         }
  1339.     }
  1340.  
  1341.     /* copy outgoing sys_attr fields from returned object_attr */
  1342.     sysresp->attr.owner = attr->owner;
  1343.     sysresp->attr.group = attr->group;
  1344.     sysresp->attr.perms = attr->perms;
  1345.     sysresp->attr.atime = attr->atime;
  1346.     sysresp->attr.mtime = attr->mtime;
  1347.     sysresp->attr.ctime = attr->ctime;
  1348.     sysresp->attr.mask  = PVFS_util_object_to_sys_attr_mask(attr->mask);
  1349.     sysresp->attr.size  = 0;
  1350.     sysresp->attr.objtype = attr->objtype;
  1351.  
  1352.     if (js_p->error_code == 0)
  1353.     {
  1354.         /* convert outgoing attribute mask based on what we got */
  1355.         sysresp->attr.mask = PVFS_util_object_to_sys_attr_mask(
  1356.             sm_p->getattr.attr.mask);
  1357.  
  1358.        if (sm_p->getattr.req_attrmask & PVFS_ATTR_DATA_SIZE)
  1359.         {
  1360.             if( attr->objtype == PVFS_TYPE_DATAFILE )
  1361.             {
  1362.                 sysresp->attr.size = attr->u.data.size;
  1363.             }
  1364.             else
  1365.             {
  1366.                 sysresp->attr.size = sm_p->getattr.size;
  1367.             }
  1368.  
  1369.             sysresp->attr.mask |= PVFS_ATTR_SYS_SIZE;
  1370.         }
  1371.  
  1372.         if(attr->mask & PVFS_ATTR_META_DIST)
  1373.         {
  1374.             /* we have enough information to set a block size */
  1375.             sysresp->attr.blksize = attr->u.meta.dist->methods->get_blksize(
  1376.                 attr->u.meta.dist->params);
  1377.             sysresp->attr.mask |= PVFS_ATTR_SYS_BLKSIZE;
  1378.         }
  1379.  
  1380.         /* if this is a symlink, add the link target */
  1381.         if (sm_p->getattr.req_attrmask & PVFS_ATTR_SYMLNK_TARGET)
  1382.         {
  1383.             sysresp->attr.mask |= PVFS_ATTR_SYS_LNK_TARGET;
  1384.         }
  1385.  
  1386.         if(sm_p->getattr.req_attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
  1387.         {
  1388.             sysresp->attr.dirent_count = attr->u.dir.dirent_count;
  1389.             sysresp->attr.mask |= PVFS_ATTR_SYS_DIRENT_COUNT;
  1390.         }
  1391.     }
  1392.     else
  1393.     {
  1394.         /* in case of failure, blank out response */ 
  1395.         memset(sm_p->u.getattr.getattr_resp_p,
  1396.                0, sizeof(PVFS_sysresp_getattr));
  1397.     }
  1398.  
  1399.     PINT_SM_GETATTR_STATE_CLEAR(sm_p->getattr);
  1400.  
  1401.     PINT_SET_OP_COMPLETE;
  1402.     return SM_ACTION_TERMINATE;
  1403. }
  1404.  
  1405. /*
  1406.  * Local variables:
  1407.  *  mode: c
  1408.  *  c-indent-level: 4
  1409.  *  c-basic-offset: 4
  1410.  * End:
  1411.  *
  1412.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  1413.  */
  1414.