home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / server / get-attr.sm < prev    next >
Text File  |  2010-12-10  |  60KB  |  1,862 lines

  1. /* 
  2.  * (C) 2001 Clemson University and The University of Chicago 
  3.  *
  4.  * See COPYING in top-level directory.
  5.  *
  6.  * Changes by Acxiom Corporation to add dirent_count field to attributes
  7.  * Copyright ⌐ Acxiom Corporation, 2005.
  8.  */
  9.  
  10. /* pvfs2_get_attr_sm
  11.  *
  12.  * This state machine handles incoming server getattr operations.  These
  13.  * are the operations sent by PVFS_sys_getattr() among others.
  14.  *
  15.  * The pvfs2_prelude_sm is responsible for reading the actual metadata
  16.  * to begin with, because it does this as part of the permission checking
  17.  * process.
  18.  */
  19.  
  20. #include <string.h>
  21. #include <assert.h>
  22.  
  23. #include "server-config.h"
  24. #include "pvfs2-server.h"
  25. #include "pvfs2-attr.h"
  26. #include "pvfs2-types.h"
  27. #include "pvfs2-types-debug.h"
  28. #include "pvfs2-util.h"
  29. #include "pint-util.h"
  30. #include "pvfs2-internal.h"
  31. #include "pint-cached-config.h"
  32.  
  33. static uint64_t UINT64_HIGH = 0xffffffffffffffffLL;
  34.  
  35. PINT_server_trove_keys_s Trove_Special_Keys[] =
  36. {
  37.     {"user.pvfs2.dist_name"    , SPECIAL_DIST_NAME_KEYLEN},
  38.     {"user.pvfs2.dist_params"  , SPECIAL_DIST_PARAMS_KEYLEN},
  39.     {"user.pvfs2.num_dfiles"   , SPECIAL_NUM_DFILES_KEYLEN},
  40.     {"user.pvfs2.meta_hint"    , SPECIAL_METAFILE_HINT_KEYLEN},
  41.     {"user.pvfs2.mirror.copies", SPECIAL_MIRROR_COPIES_KEYLEN},
  42.     {"user.pvfs2.mirror.handles", SPECIAL_MIRROR_HANDLES_KEYLEN},
  43.     {"user.pvfs2.mirror.status" , SPECIAL_MIRROR_STATUS_KEYLEN},
  44. };
  45.  
  46. enum
  47. {
  48.     STATE_METAFILE  = 7,
  49.     STATE_SYMLINK   = 9,
  50.     STATE_DIR       = 10,
  51.     STATE_DIR_HINT  = 11,
  52.     STATE_DONE      = 12,
  53.     SKIP_NEXT_STATE = 13,
  54. };
  55.  
  56. static void free_nested_getattr_data(struct PINT_server_op *s_op);
  57.  
  58. %%
  59.  
  60. nested machine pvfs2_get_attr_work_sm
  61. {
  62.     state verify_attribs
  63.     {
  64.         run getattr_verify_attribs;
  65.         STATE_SYMLINK => read_symlink_target;
  66.         STATE_METAFILE => read_metafile_hint;
  67.         STATE_DIR => get_dirdata_handle;
  68.         default => setup_resp;
  69.     }
  70.  
  71.     state read_symlink_target
  72.     {
  73.         run getattr_read_symlink_target;
  74.         default => setup_resp;
  75.     }
  76.  
  77.     state read_metafile_hint
  78.     {
  79.         run getattr_read_metafile_hint;
  80.         default => interpret_metafile_hint;
  81.     }
  82.  
  83.     state interpret_metafile_hint
  84.     {
  85.         run getattr_interpret_metafile_hint;
  86.         STATE_METAFILE => read_metafile_datafile_handles_if_required;
  87.         default => setup_resp;
  88.     }
  89.  
  90.     state read_metafile_datafile_handles_if_required
  91.     {
  92.         run getattr_read_metafile_datafile_handles_if_required;
  93.         success => datafile_handles_safety_check;
  94.         default => setup_resp;
  95.     }
  96.  
  97.     state datafile_handles_safety_check
  98.     {
  99.         run getattr_datafile_handles_safety_check;
  100.         success => read_mirrored_copies_count_if_required; 
  101.         default => setup_resp;
  102.     }
  103.  
  104.     state read_mirrored_copies_count_if_required
  105.     {
  106.         run getattr_read_mirrored_copies_count_if_required;
  107.         SKIP_NEXT_STATE => read_metafile_distribution_if_required;
  108.         default => read_mirrored_handles_if_required;
  109.     }
  110.  
  111.     state read_mirrored_handles_if_required
  112.     {
  113.         run getattr_read_mirrored_handles_if_required;
  114.         SKIP_NEXT_STATE => read_metafile_distribution_if_required;
  115.         default => mirrored_handles_safety_check;
  116.     }
  117.  
  118.     state mirrored_handles_safety_check
  119.     {
  120.         run getattr_mirrored_handles_safety_check;
  121.         success => read_metafile_distribution_if_required;
  122.         default => setup_resp;
  123.     }
  124.  
  125.     state read_metafile_distribution_if_required
  126.     {
  127.         run getattr_read_metafile_distribution_if_required;
  128.         default => interpret_metafile_distribution;
  129.     }
  130.  
  131.     state interpret_metafile_distribution
  132.     {
  133.         run interpret_metafile_distribution;
  134.         success => detect_stuffed;
  135.         default => setup_resp;
  136.     }
  137.  
  138.     state detect_stuffed
  139.     {
  140.         run getattr_detect_stuffed;
  141.         default => read_stuffed_size;
  142.     }
  143.  
  144.     state read_stuffed_size
  145.     {
  146.         run getattr_read_stuffed_size;
  147.         success => interpret_stuffed_size;
  148.         default => setup_resp;
  149.     }
  150.  
  151.     state interpret_stuffed_size
  152.     {
  153.         run getattr_interpret_stuffed_size;
  154.         default => setup_resp;
  155.     }
  156.  
  157.     state get_dirdata_handle
  158.     {
  159.         run getattr_get_dirdata_handle;
  160.         success => get_dirent_count;
  161.         default => setup_resp;
  162.     }
  163.  
  164.     state get_dirent_count
  165.     {
  166.         run getattr_get_dirent_count;
  167.         STATE_DIR_HINT => get_dir_hint;
  168.         default => interpret_dirent_count;
  169.     }
  170.  
  171.     state interpret_dirent_count
  172.     {
  173.         run getattr_interpret_dirent_count;
  174.         default => get_dir_hint;
  175.     }
  176.  
  177.     state get_dir_hint
  178.     {
  179.         run getattr_get_dir_hint;
  180.         STATE_DONE => setup_resp;
  181.         default => interpret_dir_hint;
  182.     }
  183.  
  184.     state interpret_dir_hint
  185.     {
  186.         run getattr_interpret_dir_hint;
  187.         default => setup_resp;
  188.     }
  189.  
  190.     state setup_resp
  191.     {
  192.         run getattr_setup_resp;
  193.         default => return;
  194.     }
  195. }
  196.  
  197. nested machine pvfs2_get_attr_with_prelude_sm
  198. {
  199.     state init
  200.     {
  201.         run getattr_with_prelude_init;
  202.         default => prelude;
  203.     }
  204.  
  205.     state prelude
  206.     {
  207.         jump pvfs2_prelude_sm;
  208.         success => setup_op;
  209.         default => return;
  210.     }
  211.  
  212.     state setup_op
  213.     {
  214.         run getattr_setup_op;
  215.         default => do_work;
  216.     }
  217.  
  218.     state do_work
  219.     {
  220.         jump pvfs2_get_attr_work_sm;
  221.         default => return;
  222.     }
  223. }
  224.  
  225. machine pvfs2_get_attr_sm
  226. {
  227.     state work
  228.     {
  229.         jump pvfs2_get_attr_with_prelude_sm;
  230.         default => final_response;
  231.     }
  232.  
  233.     state final_response
  234.     {
  235.         jump pvfs2_final_response_sm;
  236.         default => cleanup;
  237.     }
  238.  
  239.     state cleanup
  240.     {
  241.         run getattr_cleanup;
  242.         default => terminate;
  243.     }
  244. }
  245.  
  246. %%
  247.  
  248.  
  249.  
  250. /* getattr_verify_attribs()
  251.  *
  252.  * We initialize the attribute mask that will be returned in this
  253.  * function.  This mask can be augmented in some of the other states.
  254.  */
  255. static PINT_sm_action getattr_verify_attribs(
  256.         struct PINT_smcb *smcb, job_status_s *js_p)
  257. {
  258.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  259.     PVFS_object_attr *resp_attr = NULL;
  260.  
  261.     js_p->error_code = 0;
  262.  
  263.     /*
  264.       explicitly copy basic attributes structure (read in from the
  265.       prelude.sm for the matching dspace) into response to be sent
  266.       back to the client.  this is mostly for readability here to be
  267.       sure we know which fields are valid in the response at this
  268.       point.
  269.     */
  270.     resp_attr = &s_op->resp.u.getattr.attr;
  271.     memset(resp_attr, 0, sizeof(PVFS_object_attr));
  272.  
  273.     resp_attr->owner = s_op->attr.owner;
  274.     resp_attr->group = s_op->attr.group;
  275.     resp_attr->perms = s_op->attr.perms;
  276.     resp_attr->atime = s_op->attr.atime;
  277.  
  278.     resp_attr->mtime = PINT_util_mkversion_time(s_op->attr.mtime);
  279.     if (resp_attr->mtime == 0)
  280.     {
  281.         /*
  282.           this is a compatibility hack to allow existing storage
  283.           spaces to be automagically converted to this versioned time
  284.           on-disk format slowly over time and doing the right thing in
  285.           the meantime
  286.         */
  287.         resp_attr->mtime = s_op->attr.mtime;
  288.  
  289.         gossip_debug(GOSSIP_GETATTR_DEBUG, " No version found!  Using "
  290.                      "mtime %llu\n", llu(resp_attr->mtime));
  291.     }
  292.     else
  293.     {
  294.         gossip_debug(
  295.             GOSSIP_GETATTR_DEBUG, " VERSION is %llu, mtime is %llu\n",
  296.             llu(s_op->attr.mtime), llu(resp_attr->mtime));
  297.     }
  298.  
  299.     resp_attr->ctime = s_op->attr.ctime;
  300.     resp_attr->mask = s_op->attr.mask;
  301.     resp_attr->objtype = s_op->attr.objtype;
  302.     resp_attr->u.meta.dfile_count = s_op->attr.u.meta.dfile_count;
  303.     resp_attr->u.meta.dist_size = s_op->attr.u.meta.dist_size;
  304.  
  305. #if 0
  306.     gossip_debug(
  307.         GOSSIP_GETATTR_DEBUG,
  308.         "+  _DSPACE_ retrieved attrs: [owner = %d, group = %d\n\t"
  309.         "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
  310.         "ctime = %llu, dfile_count = %d, dist_size = %d]\n",
  311.         resp_attr->owner, resp_attr->group, resp_attr->perms,
  312.         resp_attr->objtype, llu(resp_attr->atime),
  313.         llu(resp_attr->mtime), llu(resp_attr->ctime),
  314.         (int)resp_attr->u.meta.dfile_count,
  315.         (int)resp_attr->u.meta.dist_size);
  316. #endif
  317.  
  318.     /*
  319.       weed out the attr mask of the response based on what the client
  320.       request asked for.  also, check if we need to retrieve more
  321.       information before returning the response to the client (by
  322.       guiding the state machine to get it).
  323.  
  324.       we can safely do this now that we have the type of the object
  325.       (read in from the dspace, not stored in the resp_attr), and we
  326.       have the original client request attr mask
  327.       (s_op->u.getattr.attrmask).
  328.     */
  329.     switch(resp_attr->objtype)
  330.     {
  331.         case PVFS_TYPE_METAFILE:
  332.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: metafile\n");
  333.             gossip_debug(GOSSIP_GETATTR_DEBUG,
  334.                          "  Req handle %llu refers to a metafile\n",
  335.                          llu(s_op->u.getattr.handle));
  336.  
  337.             if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES)
  338.             {
  339.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  340.                              " dspace has dfile_count of %d\n",
  341.                              resp_attr->u.meta.dfile_count);
  342.                 resp_attr->mask |= PVFS_ATTR_META_DFILES;
  343.             }
  344.             else
  345.             {
  346.                 gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
  347.                              "dfile info, clearing response attr mask\n");
  348.                 resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
  349.             }
  350.  
  351.             if (s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
  352.             {
  353.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  354.                              " dspace has dist size of %d\n",
  355.                              resp_attr->u.meta.dist_size);
  356.  
  357.                 resp_attr->mask |= PVFS_ATTR_META_DIST;
  358.             }
  359.             else
  360.             {
  361.                 gossip_debug(GOSSIP_GETATTR_DEBUG, " client doesn't want "
  362.                              "dist info, clearing response attr mask\n");
  363.  
  364.                 resp_attr->mask &= ~PVFS_ATTR_META_DIST;
  365.             }
  366.  
  367.             if (s_op->u.getattr.attrmask & PVFS_ATTR_META_MIRROR_DFILES)
  368.             {
  369.                gossip_debug(GOSSIP_GETATTR_DEBUG,"client wants mirrored "
  370.                                                  "handles.\n");
  371.                resp_attr->mask |= PVFS_ATTR_META_MIRROR_DFILES;
  372.                resp_attr->u.meta.mirror_copies_count = 0;
  373.                resp_attr->u.meta.mirror_dfile_array  = NULL;
  374.             }
  375.             else
  376.             {
  377.                gossip_debug(GOSSIP_GETATTR_DEBUG,"client doesn't want "
  378.                                                  "mirrored handles.\n");
  379.                resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
  380.             }
  381.             js_p->error_code = STATE_METAFILE;
  382.             break;
  383.         case PVFS_TYPE_DATAFILE:
  384.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: datafile\n");
  385.             /*
  386.               note: the prelude already retrieved the size for us, so
  387.               there's no special action that needs to be taken if we have
  388.               a datafile here (other than adjusting our mask to include
  389.               the data information and copying the retrieved size from the
  390.               ds_attribute the prelude used)
  391.             */
  392.             resp_attr->u.data.size = s_op->ds_attr.u.datafile.b_size;
  393.             resp_attr->mask |= PVFS_ATTR_DATA_ALL;
  394.  
  395.             gossip_debug(GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
  396.                          "a datafile (size = %lld).\n",
  397.                          llu(s_op->u.getattr.handle),
  398.                          lld(resp_attr->u.data.size));
  399.             break;
  400.         case PVFS_TYPE_DIRECTORY:
  401.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: directory\n");
  402.             if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT)
  403.             {
  404.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  405.                              " getattr: dirent_count needed.\n");
  406.                 assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
  407.                 resp_attr->mask |= PVFS_ATTR_DIR_DIRENT_COUNT;
  408.                 js_p->error_code = STATE_DIR;
  409.             }
  410.             else
  411.             {
  412.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  413.                              " getattr: dirent_count not needed.\n");
  414.                 js_p->error_code = 0;
  415.                 assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
  416.             }
  417.             if (s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT)
  418.             {
  419.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  420.                             " getattr: dfile_count needed.\n");
  421.                 assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
  422.                 resp_attr->mask |= PVFS_ATTR_DIR_HINT;
  423.                 js_p->error_code = STATE_DIR;
  424.             }
  425.             else
  426.             {
  427.                 gossip_debug(GOSSIP_GETATTR_DEBUG,
  428.                             " getattr: dfile_count not needed\n");
  429.                 assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
  430.             }
  431.             break;
  432.         case PVFS_TYPE_DIRDATA:
  433.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: dirdata\n");
  434.             gossip_debug(
  435.                 GOSSIP_GETATTR_DEBUG, "  handle %llu refers to "
  436.                 "a dirdata object. doing nothing special\n",
  437.                 llu(s_op->u.getattr.handle));
  438.             assert(resp_attr->mask & PVFS_ATTR_COMMON_ALL);
  439.             break;
  440.         case PVFS_TYPE_SYMLINK:
  441.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
  442.             gossip_debug(
  443.                 GOSSIP_GETATTR_DEBUG, "  handle %llu refers to a symlink.\n",
  444.                 llu(s_op->u.getattr.handle));
  445.  
  446.             /*
  447.               we'll definitely have to fetch the symlink target in this
  448.               case, as the prelude will never retrieve it for us
  449.             */
  450.             js_p->error_code = STATE_SYMLINK;
  451.             break;
  452.         case PVFS_TYPE_INTERNAL:
  453.             PINT_ACCESS_DEBUG(s_op, GOSSIP_ACCESS_DEBUG, "type: symlink\n");
  454.             /* nothing interesting to add; this is meaningless to a client */
  455.             break;
  456.         default:
  457.             /* if we don't understand the object type, then it probably indicates
  458.              * a bug or some data corruption.  All trove objects should have a
  459.              * type set.
  460.              */
  461.             gossip_err(
  462.                 "Error: got unknown type when verifying attributes for "
  463.                 "handle %llu.\n", 
  464.                 llu(s_op->u.getattr.handle));
  465.             js_p->error_code = -PVFS_ENXIO;
  466.             break;
  467.     }
  468.  
  469.     return SM_ACTION_COMPLETE;
  470. }
  471.  
  472. static PINT_sm_action getattr_read_symlink_target(
  473.         struct PINT_smcb *smcb, job_status_s *js_p)
  474. {
  475.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  476.     int ret;
  477.     job_id_t i;
  478.  
  479.     /* if we don't need to fill in the symlink target, skip it */
  480.     if (!(s_op->u.getattr.attrmask & PVFS_ATTR_SYMLNK_TARGET))
  481.     {
  482.         gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping symlink target read\n");
  483.         js_p->error_code = 0;
  484.         return SM_ACTION_COMPLETE;
  485.     }
  486.     
  487.     s_op->key.buffer    = Trove_Common_Keys[SYMLINK_TARGET_KEY].key;
  488.     s_op->key.buffer_sz = Trove_Common_Keys[SYMLINK_TARGET_KEY].size;
  489.  
  490.     /*
  491.       optimistically add mask value to indicate the symlink target is
  492.       filled (error_code is checked in next state)
  493.     */
  494.     s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_SYMLNK_TARGET;
  495.  
  496.     s_op->resp.u.getattr.attr.u.sym.target_path_len = PVFS_NAME_MAX;
  497.     s_op->resp.u.getattr.attr.u.sym.target_path =
  498.     malloc(s_op->resp.u.getattr.attr.u.sym.target_path_len);
  499.     if (!s_op->resp.u.getattr.attr.u.sym.target_path)
  500.     {
  501.     js_p->error_code = -PVFS_ENOMEM;
  502.     return SM_ACTION_COMPLETE;
  503.     }
  504.  
  505.     if(s_op->free_val)
  506.     {
  507.         free(s_op->val.buffer);
  508.     }
  509.     s_op->val.buffer = s_op->resp.u.getattr.attr.u.sym.target_path;
  510.     s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.sym.target_path_len;
  511.     /* this will get cleaned up with attr structure */
  512.     s_op->free_val = 0;
  513.  
  514.     ret = job_trove_keyval_read(
  515.         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
  516.         &s_op->key, &s_op->val,
  517.         0,
  518.         NULL, smcb, 0, js_p,
  519.         &i, server_job_context, s_op->req->hints);
  520.  
  521.  
  522.     return ret;
  523. }
  524.  
  525. static PINT_sm_action getattr_interpret_metafile_hint(
  526.     PINT_smcb *smcb, job_status_s *js_p)
  527. {
  528.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  529.     PVFS_object_attr *resp_attr = NULL;
  530.     PVFS_metafile_attr *meta = &(s_op->resp.u.getattr.attr.u.meta); 
  531.     resp_attr = &s_op->resp.u.getattr.attr;
  532.  
  533.     assert(resp_attr->objtype == PVFS_TYPE_METAFILE);
  534.  
  535.     if (js_p->error_code == 0 || js_p->error_code == -TROVE_ENOENT)
  536.     {
  537.         if (js_p->error_code == 0)
  538.         {
  539.             memcpy(&(meta->hint), s_op->val.buffer, sizeof(meta->hint));
  540.         }
  541.         if ((resp_attr->mask & PVFS_ATTR_META_DFILES) ||
  542.             (resp_attr->mask & PVFS_ATTR_META_DIST)   ||
  543.             (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES))
  544.         {
  545.             gossip_debug(GOSSIP_GETATTR_DEBUG, " * client wants extra "
  546.                          "meta info, about to retrieve it now\n");
  547.             js_p->error_code = STATE_METAFILE;
  548.             if ( (resp_attr->mask  & PVFS_ATTR_META_MIRROR_DFILES) && 
  549.                 !(meta->hint.flags & PVFS_MIRROR_FL) ) 
  550.                   resp_attr->mask &= ~(PVFS_ATTR_META_MIRROR_DFILES);
  551.         }
  552.         else
  553.         {
  554.             gossip_debug(GOSSIP_GETATTR_DEBUG, " * client doesn't want "
  555.                          "extra meta info, preparing response now\n");
  556.             js_p->error_code = 0;
  557.         }
  558.     } else {
  559.         /*If we hit an error the DIST & DFILES are no longer valid*/
  560.         resp_attr->mask &= ~PVFS_ATTR_META_DIST;
  561.         resp_attr->mask &= ~PVFS_ATTR_META_DFILES;
  562.         resp_attr->mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  563.     }
  564.     return SM_ACTION_COMPLETE;
  565. }
  566.  
  567. static PINT_sm_action getattr_read_metafile_hint(
  568.     struct PINT_smcb *smcb, job_status_s *js_p)
  569. {
  570.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  571.     int ret = -PVFS_EINVAL;
  572.     job_id_t i;
  573.     char *buf = NULL;
  574.  
  575.     assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
  576.     buf = (char *) calloc(sizeof(PVFS_metafile_hint) + 1, 1);
  577.     if (buf == NULL) 
  578.     {
  579.         js_p->error_code = -PVFS_ENOMEM;
  580.         /*If we hit an error the DIST & DFILES are no longer valid*/
  581.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
  582.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
  583.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  584.         return 1;
  585.     }
  586.  
  587.     js_p->error_code = 0;
  588.  
  589.     s_op->key.buffer = Trove_Special_Keys[METAFILE_HINT_KEY].key;
  590.     s_op->key.buffer_sz = Trove_Special_Keys[METAFILE_HINT_KEY].size;
  591.  
  592.     if(s_op->free_val)
  593.     {
  594.         free(s_op->val.buffer);
  595.     }
  596.     s_op->val.buffer = buf;
  597.     s_op->val.buffer_sz = sizeof(s_op->resp.u.getattr.attr.u.meta.hint) + 1;
  598.     s_op->free_val = 1;
  599.  
  600.     gossip_debug(GOSSIP_GETATTR_DEBUG,
  601.          "  reading metafile hint (coll_id = %d, "
  602.                  "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
  603.          s_op->u.getattr.fs_id,
  604.          llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
  605.          s_op->key.buffer_sz, s_op->val.buffer,
  606.          s_op->val.buffer_sz);
  607.  
  608.     ret = job_trove_keyval_read(
  609.         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
  610.         &s_op->key, &s_op->val, 
  611.         0, 
  612.         NULL, smcb, 0, js_p,
  613.         &i, server_job_context, s_op->req->hints);
  614.  
  615.     return ret;
  616. }
  617.  
  618. static PINT_sm_action getattr_read_metafile_datafile_handles_if_required(
  619.         struct PINT_smcb *smcb, job_status_s *js_p)
  620. {
  621.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  622.     int ret = -PVFS_EINVAL;
  623.     int dfile_count = 0;
  624.     job_id_t i;
  625.  
  626.     assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
  627.  
  628.     js_p->error_code = 0;
  629.  
  630.     /* if we don't need to fill in the dfiles, skip them */
  631.     if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
  632.     {
  633.         gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle read\n");
  634.         return SM_ACTION_COMPLETE;
  635.     }
  636.  
  637.     dfile_count = s_op->resp.u.getattr.attr.u.meta.dfile_count;
  638.  
  639.     gossip_debug(GOSSIP_GETATTR_DEBUG,
  640.                  " request has dfile_count of %d | dspace has %d\n",
  641.                  s_op->resp.u.getattr.attr.u.meta.dfile_count,
  642.                  s_op->resp.u.getattr.attr.u.meta.dfile_count);
  643.  
  644.     /* verify that the retrieved dfile count is sane */
  645.     if (!PVFS_REQ_LIMIT_DFILE_COUNT_IS_VALID(dfile_count))
  646.     {
  647.         gossip_err("The requested dfile count of %d is invalid; "
  648.                    "aborting operation.\n", dfile_count);
  649.     gossip_err(
  650.             "+ attrs read from dspace: (owner = %d, group = %d, "
  651.             "perms = %o, type = %d\n   atime = %lld, mtime = %lld, "
  652.             "ctime = %lld |\n   dfile_count = %d | dist_size = %d)\n",
  653.             s_op->resp.u.getattr.attr.owner,
  654.             s_op->resp.u.getattr.attr.group, 
  655.             s_op->resp.u.getattr.attr.perms,
  656.             s_op->resp.u.getattr.attr.objtype, 
  657.             lld(s_op->resp.u.getattr.attr.atime),
  658.             lld(s_op->resp.u.getattr.attr.mtime), 
  659.             lld(s_op->resp.u.getattr.attr.ctime),
  660.             (int)s_op->resp.u.getattr.attr.u.meta.dfile_count,
  661.             (int)s_op->resp.u.getattr.attr.u.meta.dist_size);
  662.  
  663.     gossip_err("handle: %llu (%llx), fsid: %d.\n",
  664.         llu(s_op->u.getattr.handle), llu(s_op->u.getattr.handle),
  665.         (int)s_op->u.getattr.fs_id);
  666.  
  667.         /*If we hit an error the DIST & DFILES are no longer valid*/
  668.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
  669.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
  670.         s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  671.         
  672.     js_p->error_code = -PVFS_EOVERFLOW;
  673.     return SM_ACTION_COMPLETE;
  674.     }
  675.  
  676.     s_op->key.buffer = Trove_Common_Keys[METAFILE_HANDLES_KEY].key;
  677.     s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_HANDLES_KEY].size;
  678.  
  679.     /* add mask value to indicate the data file array is filled */
  680.     s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DFILES;
  681.  
  682.     s_op->resp.u.getattr.attr.u.meta.dfile_array =
  683.         malloc(dfile_count * sizeof(PVFS_handle));
  684.     if (!s_op->resp.u.getattr.attr.u.meta.dfile_array)
  685.     {
  686.         gossip_err("Cannot allocate dfile array of count %d\n",
  687.                    dfile_count);
  688.     js_p->error_code = -PVFS_ENOMEM;
  689.     return SM_ACTION_COMPLETE;
  690.     }
  691.  
  692.     if(s_op->free_val)
  693.     {
  694.         free(s_op->val.buffer);
  695.     }
  696.     s_op->val.buffer = s_op->resp.u.getattr.attr.u.meta.dfile_array;
  697.     s_op->val.buffer_sz = (dfile_count * sizeof(PVFS_handle));
  698.     /* this will get cleaned up with attr structure */
  699.     s_op->free_val = 0;
  700.  
  701.     gossip_debug(GOSSIP_GETATTR_DEBUG,
  702.          "  reading %d datafile handles (coll_id = %d, "
  703.                  "handle = %llu, key = %s (%d), val_buf = %p (%d))\n",
  704.          dfile_count, s_op->u.getattr.fs_id,
  705.          llu(s_op->u.getattr.handle), (char *)s_op->key.buffer,
  706.          s_op->key.buffer_sz, s_op->val.buffer,
  707.          s_op->val.buffer_sz);
  708.  
  709.     ret = job_trove_keyval_read(
  710.         s_op->u.getattr.fs_id
  711.        ,s_op->u.getattr.handle
  712.        ,&s_op->key
  713.        ,&s_op->val
  714.        ,0
  715.        ,NULL
  716.        ,smcb
  717.        ,0
  718.        ,js_p
  719.        ,&i
  720.        ,server_job_context
  721.        ,s_op->req->hints);
  722.  
  723.     return ret;
  724. }
  725.  
  726.  
  727. static PINT_sm_action getattr_read_mirrored_copies_count_if_required(
  728.         struct PINT_smcb *smcb, job_status_s *js_p)
  729. {
  730.     gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s...\n",__func__);
  731.  
  732.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  733.     struct PVFS_server_resp *resp = &(s_op->resp);
  734.     PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
  735.     int ret = -PVFS_EINVAL;
  736.     job_id_t job_id;
  737.  
  738.    /* Are we mirroring? */
  739.     if (!(resp->u.getattr.attr.mask & PVFS_ATTR_META_MIRROR_DFILES))
  740.     {
  741.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring is NOT turned on "
  742.                                          "for this handle(%llu)..\n"
  743.                                         ,llu(s_op->u.getattr.handle));
  744.         js_p->error_code = SKIP_NEXT_STATE;
  745.  
  746.         return SM_ACTION_COMPLETE;
  747.     }
  748.  
  749.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tMirroring IS turned on for this "
  750.                                      "handle(%llu)...\n"
  751.                                     ,llu(s_op->u.getattr.handle));
  752.  
  753.     js_p->error_code = 0;
  754.  
  755.     /* setup job to read user.pvfs2.mirror.copies */
  756.  
  757.     /* initialize */
  758.     if (s_op->free_val)
  759.        free(s_op->val.buffer);
  760.     memset(&(s_op->val),0,sizeof(s_op->val));
  761.     memset(&(s_op->key),0,sizeof(s_op->key));
  762.  
  763.     /* set key = user.pvfs2.mirror.copies */
  764.     s_op->key.buffer    = Trove_Special_Keys[MIRROR_COPIES_KEY].key;
  765.     s_op->key.buffer_sz = Trove_Special_Keys[MIRROR_COPIES_KEY].size;
  766.  
  767.     /* setup space for retrieved value */
  768.     meta->mirror_copies_count = 0;
  769.     meta->mirror_dfile_array  = NULL;
  770.     s_op->val.buffer = &(meta->mirror_copies_count);
  771.     s_op->val.buffer_sz = sizeof(meta->mirror_copies_count);
  772.     s_op->free_val = 0;
  773.  
  774.     /* submit job to read the value */
  775.     ret = job_trove_keyval_read(
  776.         s_op->u.getattr.fs_id
  777.        ,s_op->u.getattr.handle
  778.        ,&s_op->key
  779.        ,&s_op->val
  780.        ,0
  781.        ,NULL
  782.        ,smcb
  783.        ,0
  784.        ,js_p
  785.        ,&job_id
  786.        ,server_job_context
  787.        ,s_op->req->hints);
  788.  
  789.     return ret;
  790. }/*end getattr_read_mirrored_copies_if_required*/
  791.  
  792.  
  793.  
  794. static PINT_sm_action getattr_read_mirrored_handles_if_required(
  795.         struct PINT_smcb *smcb, job_status_s *js_p)
  796. {
  797.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
  798.  
  799.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  800.     struct PVFS_server_resp *resp = &(s_op->resp);
  801.     PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
  802.     int ret = -PVFS_EINVAL;
  803.     job_id_t job_id;
  804.     int i;
  805.  
  806.    /* Did we find mirror.copies? */
  807.     if (js_p->error_code < 0)
  808.     {
  809.         gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies cannot "
  810.                                          "be retrieved.\n");
  811.         if (resp->u.getattr.attr.mask & (PVFS_ATTR_META_DFILES |
  812.                                          PVFS_ATTR_META_DIST) )
  813.         {
  814.             resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  815.             js_p->error_code = SKIP_NEXT_STATE;
  816.         } else {
  817.             resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  818.             resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
  819.             resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
  820.         }
  821.         memset(&(s_op->key),0,sizeof(s_op->key));
  822.         memset(&(s_op->val),0,sizeof(s_op->val));
  823.         s_op->free_val = 0;
  824.  
  825.         return SM_ACTION_COMPLETE;
  826.     }
  827.  
  828.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies count "
  829.                                      "successfully retrieved.\n");
  830.  
  831.    /* check number of mirrored copies */
  832.     if (meta->mirror_copies_count == 0)
  833.     {
  834.        gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
  835.                                         "is ZERO.\n");
  836.        gossip_lerr("Mirror handles requested, but number of mirrored copies "
  837.                    "is zero.\n");
  838.        resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  839.        js_p->error_code = SKIP_NEXT_STATE;
  840.        return SM_ACTION_COMPLETE;
  841.     }
  842.  
  843.     gossip_debug(GOSSIP_MIRROR_DEBUG,"\tNumber of mirrored copies "
  844.                                      "retrieved : %d\n"
  845.                                     ,meta->mirror_copies_count);
  846.  
  847.  
  848.    /* check to see if total number of mirrored handles is sane */
  849.    if ( (meta->dfile_count * meta->mirror_copies_count) >
  850.          PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT )
  851.    {
  852.        gossip_lerr("Number of mirrored handles(%d) exceeds the system "
  853.                    "limit(%d)\n"
  854.                   ,meta->dfile_count * meta->mirror_copies_count
  855.                   ,PVFS_REQ_LIMIT_MIRROR_DFILE_COUNT); 
  856.        resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  857.        js_p->error_code = SKIP_NEXT_STATE;
  858.        return SM_ACTION_COMPLETE;
  859.    }
  860.  
  861.    js_p->error_code = 0;
  862.    /* get mirrored handles and status of each handle */
  863.  
  864.    /* initialize */
  865.    if (s_op->free_val)
  866.       free(s_op->val.buffer);
  867.    memset(&(s_op->key),0,sizeof(s_op->key));
  868.    memset(&(s_op->val),0,sizeof(s_op->val));
  869.  
  870.    for (i=0; i<s_op->keyval_count; i++)
  871.        if (s_op->val_a && s_op->val_a[i].buffer && s_op->free_val)
  872.            free(s_op->val_a[i].buffer);
  873.    if (s_op->val_a)
  874.        free(s_op->val_a);
  875.    if (s_op->key_a)
  876.        free(s_op->key_a);
  877.    if (s_op->error_a)
  878.       free(s_op->error_a);
  879.    s_op->free_val = 0;
  880.  
  881.    /* allocate space for keys and values */
  882.    s_op->keyval_count = 2;
  883.    s_op->free_val = 1;
  884.    s_op->key_a = s_op->val_a = NULL;
  885.    s_op->error_a = NULL;
  886.    
  887.    s_op->key_a   = malloc(sizeof(*s_op->key_a)   * s_op->keyval_count);
  888.    s_op->val_a   = malloc(sizeof(*s_op->val_a)   * s_op->keyval_count);
  889.    s_op->error_a = malloc(sizeof(*s_op->error_a) * s_op->keyval_count);
  890.    if (!s_op->key_a || !s_op->val_a || !s_op->error_a)
  891.    {
  892.       gossip_lerr("Cannot allocate memory for key/val/error.\n");
  893.       js_p->error_code = -PVFS_ENOMEM;
  894.       goto error_exit;
  895.    }
  896.    memset(s_op->key_a,0,sizeof(*s_op->key_a));
  897.    memset(s_op->val_a,0,sizeof(*s_op->val_a));
  898.  
  899.    /* set key = user.pvfs2.mirror.handles */
  900.    s_op->key_a[0].buffer    = Trove_Special_Keys[MIRROR_HANDLES_KEY].key;
  901.    s_op->key_a[0].buffer_sz = Trove_Special_Keys[MIRROR_HANDLES_KEY].size;
  902.  
  903.    /* setup buffer space for handles */
  904.    s_op->val_a[0].buffer = malloc(sizeof(PVFS_handle) * 
  905.                                   meta->dfile_count   *
  906.                                   meta->mirror_copies_count);
  907.    if (!s_op->val_a[0].buffer)
  908.    {
  909.       gossip_lerr("Cannot allocate memory for mirrored handles.\n");
  910.       js_p->error_code = -PVFS_ENOMEM;
  911.       goto error_exit;
  912.    }
  913.    memset(s_op->val_a[0].buffer,0,sizeof(PVFS_handle) *
  914.                                   meta->dfile_count   *
  915.                                   meta->mirror_copies_count);
  916.    s_op->val_a[0].buffer_sz = sizeof(PVFS_handle) *
  917.                               meta->dfile_count   *
  918.                               meta->mirror_copies_count;
  919.  
  920.    /* set key = user.pvfs2.mirror.status */
  921.    s_op->key_a[1].buffer    = Trove_Special_Keys[MIRROR_STATUS_KEY].key;
  922.    s_op->key_a[1].buffer_sz = Trove_Special_Keys[MIRROR_STATUS_KEY].size;
  923.  
  924.    /* setup buffer space for handle statuses */
  925.    s_op->val_a[1].buffer = malloc(sizeof(PVFS_handle) *
  926.                                   meta->dfile_count   *
  927.                                   meta->mirror_copies_count);
  928.    if (!s_op->val_a[1].buffer)
  929.    {
  930.        gossip_lerr("Cannot allocate memory for mirrored handle statuses.\n");
  931.        js_p->error_code = -PVFS_ENOMEM;
  932.        goto error_exit;
  933.    }
  934.    memset(s_op->val_a[1].buffer,0,sizeof(PVFS_handle) *
  935.                                   meta->dfile_count   *
  936.                                   meta->mirror_copies_count);
  937.    s_op->val_a[1].buffer_sz = sizeof(PVFS_handle) *
  938.                               meta->dfile_count   *
  939.                               meta->mirror_copies_count;
  940.  
  941.    
  942.    /* call job to retrieve the key/val pairs */
  943.    ret = job_trove_keyval_read_list(
  944.           s_op->u.getattr.fs_id
  945.          ,s_op->u.getattr.handle
  946.          ,s_op->key_a
  947.          ,s_op->val_a
  948.          ,s_op->error_a
  949.          ,s_op->keyval_count
  950.          ,0
  951.          ,NULL
  952.          ,smcb
  953.          ,0
  954.          ,js_p
  955.          ,&job_id
  956.          ,server_job_context
  957.          ,s_op->req->hints );
  958.  
  959.    return ret;
  960.  
  961. error_exit:
  962.    for (i=0; i<s_op->keyval_count; i++)
  963.    {
  964.        if (s_op->val_a && s_op->val_a[i].buffer)
  965.           free(s_op->val_a[i].buffer);
  966.    }
  967.    if (s_op->val_a)
  968.        free(s_op->val_a);
  969.    if (s_op->key_a)
  970.        free(s_op->key_a);
  971.    if (s_op->error_a)
  972.        free(s_op->error_a);
  973.    s_op->val_a = s_op->key_a = NULL;
  974.    s_op->error_a = NULL;
  975.    s_op->keyval_count = 0;
  976.    s_op->free_val = 0;
  977.  
  978.    resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  979.  
  980.    return SM_ACTION_COMPLETE;
  981. }/*end getattr_read_mirrored_handles_if_required*/
  982.  
  983.  
  984.  
  985.  
  986. static PINT_sm_action getattr_mirrored_handles_safety_check(
  987.         struct PINT_smcb *smcb, job_status_s *js_p)
  988. {
  989.    gossip_debug(GOSSIP_MIRROR_DEBUG,"Executing %s ...\n",__func__);
  990.  
  991.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  992.     struct PVFS_server_resp *resp = &(s_op->resp);
  993.     PVFS_metafile_attr *meta =  &(resp->u.getattr.attr.u.meta);
  994.     int row,col,index;
  995.     int i;
  996.  
  997.  
  998.    js_p->error_code = 0;
  999.  
  1000.    /* Check the error code for each key/val pair. */
  1001.    for (i=0; i<s_op->keyval_count; i++)
  1002.    {
  1003.        if (s_op->error_a[i] != 0)
  1004.        {
  1005.            gossip_debug(GOSSIP_MIRROR_DEBUG,"\tRetrieval of key(%s) failed.\n"
  1006.                                            ,(char *)s_op->key_a[i].buffer);
  1007.            js_p->error_code = s_op->error_a[i];
  1008.        }
  1009.    }
  1010.  
  1011.    if (js_p->error_code)
  1012.    {
  1013.       goto error_exit;
  1014.    }
  1015.  
  1016.    gossip_debug(GOSSIP_MIRROR_DEBUG,"\tWe successfully retrieved handles and "
  1017.                                     "statuses.\n");
  1018.  
  1019.    for (i=0; i<s_op->keyval_count; i++)
  1020.    {  /* Did we get the data that we were expecting from 
  1021.        * user.pvfs2.mirror.handles(i=0) or 
  1022.        * user.pvfs2.mirror.statuses(i=1)? 
  1023.        */
  1024.  
  1025.      if (s_op->val_a[i].read_sz != s_op->val_a[i].buffer_sz)
  1026.      {
  1027.          gossip_lerr("Error: %s key found val size: %d when "
  1028.                     "expecting val size: %d\n"
  1029.                     ,(char *)s_op->key_a[i].buffer
  1030.                     ,s_op->val_a[i].read_sz
  1031.                     ,s_op->val_a[i].buffer_sz);
  1032.          js_p->error_code = s_op->val_a[i].buffer_sz;
  1033.      }
  1034.    }/*end for*/
  1035.  
  1036.    if (js_p->error_code)
  1037.    {
  1038.        goto error_exit;
  1039.    } 
  1040.  
  1041.    /*initialize permanent data structures*/
  1042.    meta->mirror_dfile_array = s_op->val_a[0].buffer;
  1043.    s_op->u.getattr.mirror_dfile_status_array = s_op->val_a[1].buffer;
  1044.    s_op->val_a[0].buffer = s_op->val_a[1].buffer = NULL;
  1045.  
  1046.    /* Check the mirroring status for each handle.  If the status is non-zero,
  1047.     * the handle is not valid, so put a null in the mirrory array for that
  1048.     * handle.  Otherwise, do nothing.
  1049.    */
  1050.    for (row=0; row<meta->mirror_copies_count; row++)
  1051.    {
  1052.       for (col=0; col<meta->dfile_count; col++)
  1053.       {
  1054.          index = (row*meta->dfile_count) + col;
  1055.          if ( s_op->u.getattr.mirror_dfile_status_array[index] == UINT64_HIGH )
  1056.             meta->mirror_dfile_array[index] = 0;
  1057.          gossip_debug(GOSSIP_MIRROR_DEBUG,
  1058.                       "\tmirror handle[%d]:%llu \t"
  1059.                       "status:%llu\n"
  1060.                       ,index
  1061.                       ,llu(meta->mirror_dfile_array[index])
  1062.                       ,llu(s_op->u.getattr.mirror_dfile_status_array[index]));
  1063.       }
  1064.    }
  1065.  
  1066.    /*Cleanup*/
  1067.    free(s_op->key_a);
  1068.    free(s_op->val_a);
  1069.    free(s_op->error_a);
  1070.    s_op->key_a = s_op->val_a = NULL;
  1071.    s_op->error_a = NULL;
  1072.    s_op->keyval_count = 0;
  1073.    s_op->free_val = 0;   
  1074.  
  1075.    js_p->error_code = 0;
  1076.    return SM_ACTION_COMPLETE;
  1077.  
  1078. error_exit:
  1079.    /* if we have an error, cleanup, and pretend that we never attempted
  1080.     * mirrors in the first place.
  1081.    */
  1082.       gossip_debug(GOSSIP_MIRROR_DEBUG,"\tCleaning up mirror operation...\n");
  1083.       js_p->error_code = 0;
  1084.       for (i=0; i<s_op->keyval_count; i++)
  1085.       {
  1086.           if (s_op->val_a[i].buffer)
  1087.               free(s_op->val_a[i].buffer);
  1088.       }
  1089.       free(s_op->key_a);
  1090.       free(s_op->val_a);
  1091.       free(s_op->error_a);
  1092.       s_op->key_a = s_op->val_a = NULL;
  1093.       s_op->error_a = NULL;
  1094.       s_op->keyval_count = 0; 
  1095.       
  1096.       /*We MUST set the number of copies to zero to prevent encoding errors
  1097.        *later.
  1098.       */
  1099.       meta->mirror_copies_count = 0;
  1100.       meta->mirror_dfile_array = NULL;
  1101.       s_op->u.getattr.mirror_dfile_status_array = NULL;
  1102.       resp->u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  1103.       return SM_ACTION_COMPLETE;
  1104. }/*end getattr_mirrored_handles_safety_check*/
  1105.  
  1106.  
  1107.  
  1108.  
  1109. static PINT_sm_action getattr_read_metafile_distribution_if_required(
  1110.         struct PINT_smcb *smcb, job_status_s *js_p)
  1111. {
  1112.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1113.     int ret = -PVFS_EINVAL;
  1114.     job_id_t i;
  1115.  
  1116.     assert(s_op->attr.objtype == PVFS_TYPE_METAFILE);
  1117.  
  1118.     js_p->error_code = 0;
  1119.  
  1120.     /* if we don't need to fill in the distribution, skip it */
  1121.     if (!(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST))
  1122.     {
  1123.         gossip_debug(GOSSIP_GETATTR_DEBUG, "skipping data handle "
  1124.                      "distribution read\n");
  1125.         return SM_ACTION_COMPLETE;
  1126.     }
  1127.  
  1128.     s_op->key.buffer = Trove_Common_Keys[METAFILE_DIST_KEY].key;
  1129.     s_op->key.buffer_sz = Trove_Common_Keys[METAFILE_DIST_KEY].size;
  1130.  
  1131.     /*
  1132.       there *should* be some distribution information.  if not, dump
  1133.       which handle is busted and assertion die for now while we're not
  1134.       handling this kind of error
  1135.     */
  1136.     if (s_op->resp.u.getattr.attr.u.meta.dist_size < 1)
  1137.     {
  1138.         gossip_err("Cannot Read Dist!  Got an invalid dist size for "
  1139.                    "handle %llu,%d\n",llu(s_op->u.getattr.handle),
  1140.                    s_op->u.getattr.fs_id);
  1141.         js_p->error_code = -PVFS_EINVAL;
  1142.         return SM_ACTION_COMPLETE;
  1143.     }
  1144.     assert(s_op->resp.u.getattr.attr.u.meta.dist_size > 0);
  1145.  
  1146.     /* add mask value to indicate the distribution is filled */
  1147.     s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_DIST;
  1148.  
  1149.     if(s_op->free_val)
  1150.     {
  1151.         free(s_op->val.buffer);
  1152.     }
  1153.     s_op->val.buffer_sz = s_op->resp.u.getattr.attr.u.meta.dist_size; 
  1154.     s_op->val.buffer = malloc(s_op->val.buffer_sz);
  1155.     if (!s_op->val.buffer)
  1156.     {
  1157.         gossip_err("Cannot allocate dist of size %d\n",
  1158.                    s_op->val.buffer_sz);
  1159.     js_p->error_code = -PVFS_ENOMEM;
  1160.     return SM_ACTION_COMPLETE;
  1161.     }
  1162.     s_op->free_val = 1;
  1163.  
  1164.     ret = job_trove_keyval_read(
  1165.         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
  1166.         &(s_op->key), &(s_op->val),
  1167.         0,
  1168.         NULL,
  1169.         smcb, 0, js_p, &i, server_job_context, s_op->req->hints);
  1170.  
  1171.     return ret;
  1172. }
  1173.  
  1174. static PINT_sm_action getattr_read_stuffed_size(
  1175.     struct PINT_smcb *smcb, job_status_s *js_p)
  1176. {
  1177.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1178.     job_id_t job_id;
  1179.  
  1180.     if(js_p->error_code == -TROVE_ENOENT)
  1181.     {
  1182.         gossip_debug(
  1183.             GOSSIP_GETATTR_DEBUG, "Getattr detected non-stuffed file.\n");
  1184.         /* this means that the keyval fields used to indicate a file is
  1185.          * stuffed are not present.  Set mask accordingly and continue.
  1186.          */
  1187.         s_op->resp.u.getattr.attr.mask |= PVFS_ATTR_META_UNSTUFFED;
  1188.         js_p->error_code = 0;
  1189.         return SM_ACTION_COMPLETE;
  1190.     }
  1191.     if(js_p->error_code)
  1192.     {
  1193.         /* any other error code here is just a normal error case */
  1194.         /* preserve error code and catch next error transition */
  1195.         return SM_ACTION_COMPLETE;
  1196.     }
  1197.  
  1198.     gossip_debug(
  1199.         GOSSIP_GETATTR_DEBUG, "Getattr detected stuffed file.\n");
  1200.     /* otherwise, we found keyval fields indicating that the file is
  1201.      * stuffed.  It does not matter if the client asked for the size or not;
  1202.      * we must retrieve a valid stuffed_size value for the attrs.
  1203.      */
  1204.     s_op->resp.u.getattr.attr.mask &= (~(PVFS_ATTR_META_UNSTUFFED));
  1205.  
  1206.     return(job_trove_dspace_getattr(
  1207.         s_op->u.getattr.fs_id,
  1208.         s_op->resp.u.getattr.attr.u.meta.dfile_array[0],
  1209.         smcb,
  1210.         &s_op->ds_attr,
  1211.         0,
  1212.         js_p,
  1213.         &job_id,
  1214.         server_job_context,
  1215.         s_op->req->hints));
  1216. }
  1217.  
  1218. static PINT_sm_action getattr_interpret_stuffed_size(
  1219.     struct PINT_smcb *smcb, job_status_s *js_p)
  1220. {
  1221.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1222.     PVFS_metafile_attr *meta    = &(s_op->resp.u.getattr.attr.u.meta);
  1223.  
  1224.     if(js_p->error_code == 0)
  1225.     {
  1226.         meta->stuffed_size = s_op->ds_attr.u.datafile.b_size;
  1227.     }
  1228.  
  1229.     /* deliberately leave error_code unchanged so that any errors get 
  1230.      * handled in the next state
  1231.      */
  1232.     return SM_ACTION_COMPLETE;
  1233. }
  1234.  
  1235.  
  1236. /* interpret_metafile_distribution()
  1237.  *
  1238.  * capture and encode results of reading distribution
  1239.  */
  1240. static PINT_sm_action interpret_metafile_distribution(
  1241.         struct PINT_smcb *smcb, job_status_s *js_p)
  1242. {
  1243.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1244.     PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
  1245.     
  1246.     if(js_p->error_code < 0)
  1247.     {
  1248.         return SM_ACTION_COMPLETE;
  1249.     }
  1250.  
  1251.     if(s_op->u.getattr.attrmask & PVFS_ATTR_META_DIST)
  1252.     {
  1253.         /* successfully read dist key; make sure we got something valid */
  1254.         if(s_op->val.read_sz != s_op->val.buffer_sz)
  1255.         {
  1256.             gossip_err("Error: %s key found val size: %d when "
  1257.                        "expecting val size: %d\n",
  1258.                 Trove_Common_Keys[METAFILE_DIST_KEY].key,
  1259.                 s_op->val.read_sz,
  1260.                 s_op->val.buffer_sz);
  1261.  
  1262.             /* clear bitmask to prevent double free between setup_resp and
  1263.              * PINT_free_object_attr()
  1264.              */
  1265.             s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
  1266.  
  1267.             js_p->error_code = -PVFS_EIO;
  1268.             return SM_ACTION_COMPLETE;
  1269.         }
  1270.  
  1271.         assert(s_op->val.buffer);
  1272.         PINT_dist_decode(&resp_attr->u.meta.dist, s_op->val.buffer);
  1273.  
  1274.         if(resp_attr->u.meta.dist == 0) {
  1275.             gossip_err("Found dist of 0 for handle %llu,%d\n",
  1276.                     llu(s_op->u.getattr.handle), s_op->u.getattr.fs_id);
  1277.             PVFS_perror("Metafile getattr_setup_resp",js_p->error_code);
  1278.             js_p->error_code = -PVFS_EIO;
  1279.             return SM_ACTION_COMPLETE;
  1280.         }
  1281.     }
  1282.  
  1283.     js_p->error_code = 0;
  1284.     return SM_ACTION_COMPLETE;
  1285. }
  1286.  
  1287. static PINT_sm_action getattr_setup_resp(
  1288.         struct PINT_smcb *smcb, job_status_s *js_p)
  1289. {
  1290.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1291.     PVFS_object_attr *resp_attr = &s_op->resp.u.getattr.attr;
  1292.  
  1293.     if(js_p->error_code > 0)
  1294.     {
  1295.         /* if we reach this state with a positive error code it means that
  1296.          * nothing is wrong; we just used one of the explicit STATE_*
  1297.          * transitions
  1298.          */
  1299.         js_p->error_code = 0;
  1300.     }
  1301.     if(js_p->error_code < 0)
  1302.     {
  1303.         free_nested_getattr_data(s_op);
  1304.         return SM_ACTION_COMPLETE;
  1305.     }
  1306.  
  1307.     gossip_debug(
  1308.         GOSSIP_GETATTR_DEBUG,
  1309.         "-  retrieved attrs: [owner = %d, group = %d\n\t"
  1310.         "perms = %o, type = %d, atime = %llu, mtime = %llu\n\t"
  1311.         "ctime = %llu, dist_size = %d]\n",
  1312.         resp_attr->owner, resp_attr->group, resp_attr->perms,
  1313.         resp_attr->objtype, llu(resp_attr->atime),
  1314.         llu(resp_attr->mtime), llu(resp_attr->ctime),
  1315.         (int)resp_attr->u.meta.dist_size);
  1316.  
  1317.     if (resp_attr->objtype == PVFS_TYPE_METAFILE)
  1318.     {
  1319.         if (resp_attr->mask & PVFS_ATTR_META_DFILES)
  1320.         {
  1321.             if (resp_attr->u.meta.dfile_count)
  1322.             {
  1323.                 assert(resp_attr->u.meta.dfile_array);
  1324.             }
  1325.             gossip_debug(GOSSIP_GETATTR_DEBUG,
  1326.                          "  also returning %d datafile handles\n",
  1327.                          resp_attr->u.meta.dfile_count);
  1328.         }
  1329.         if (resp_attr->mask & PVFS_ATTR_META_MIRROR_DFILES)
  1330.         {
  1331.            if (resp_attr->u.meta.mirror_copies_count)
  1332.               assert(resp_attr->u.meta.mirror_dfile_array);
  1333.            gossip_debug(GOSSIP_GETATTR_DEBUG,
  1334.                         "  also returning %d mirrored copies\n"
  1335.                        ,resp_attr->u.meta.mirror_copies_count);
  1336.         }
  1337.         if (resp_attr->mask & PVFS_ATTR_META_DIST)
  1338.         {
  1339.             /* we have already gathered the dist field in an earlier state */
  1340.             gossip_debug(GOSSIP_GETATTR_DEBUG,
  1341.                          "  also returning dist size of %d\n",
  1342.                          resp_attr->u.meta.dist_size);
  1343.         }
  1344.     }
  1345.     else if ((resp_attr->objtype == PVFS_TYPE_DATAFILE) &&
  1346.              (resp_attr->mask & PVFS_ATTR_DATA_SIZE))
  1347.     {
  1348.         gossip_debug(GOSSIP_GETATTR_DEBUG,
  1349.                      "  also returning data size of %lld\n",
  1350.                      lld(resp_attr->u.data.size));
  1351.     }
  1352.     else if ((resp_attr->objtype == PVFS_TYPE_SYMLINK) &&
  1353.              (resp_attr->mask & PVFS_ATTR_SYMLNK_TARGET))
  1354.     {
  1355.         if (js_p->error_code == 0)
  1356.         {
  1357.             assert(resp_attr->u.sym.target_path);
  1358.             assert(resp_attr->u.sym.target_path_len);
  1359.             /*
  1360.               adjust target path len down to actual size ; always
  1361.               include the null termination char in the target_path_len
  1362.             */
  1363.             resp_attr->u.sym.target_path_len =
  1364.                 (strlen(resp_attr->u.sym.target_path) + 1);
  1365.  
  1366.             gossip_debug(GOSSIP_GETATTR_DEBUG,
  1367.                          "  also returning link target of %s (len %d)\n",
  1368.                          resp_attr->u.sym.target_path,
  1369.                          resp_attr->u.sym.target_path_len);
  1370.         }
  1371.         else
  1372.         {
  1373.             gossip_err("Failed to retrieve symlink target path for "
  1374.                        "handle %llu,%d\n",llu(s_op->u.getattr.handle),
  1375.                        s_op->u.getattr.fs_id);
  1376.             PVFS_perror("Symlink retrieval failure",js_p->error_code);
  1377.  
  1378.             free_nested_getattr_data(s_op);
  1379.             js_p->error_code = -PVFS_EINVAL;
  1380.             return SM_ACTION_COMPLETE;
  1381.         }
  1382.     }
  1383.     else if ((resp_attr->objtype == PVFS_TYPE_DIRECTORY) &&
  1384.             (resp_attr->mask & PVFS_ATTR_DIR_HINT))
  1385.     {
  1386.         gossip_debug(GOSSIP_GETATTR_DEBUG, " server returning "
  1387.             "dirent_count = %llu "
  1388.             "dfile_count = %d "
  1389.             "dist_name_len    = %d "
  1390.             "dist_params_len  = %d\n",
  1391.             llu(resp_attr->u.dir.dirent_count),
  1392.             resp_attr->u.dir.hint.dfile_count,
  1393.             resp_attr->u.dir.hint.dist_name_len,
  1394.             resp_attr->u.dir.hint.dist_params_len);
  1395.     }
  1396.  
  1397.     gossip_debug(GOSSIP_GETATTR_DEBUG,"@ End %s attributes: sending "
  1398.                  "status %d (error = %d)\n",
  1399.                  PINT_util_get_object_type(resp_attr->objtype),
  1400.          s_op->resp.status, js_p->error_code);
  1401.  
  1402. #if 0
  1403.     gossip_debug(GOSSIP_GETATTR_DEBUG, "returning attrmask ");
  1404.     PINT_attrmask_print(GOSSIP_GETATTR_DEBUG,
  1405.                         s_op->resp.u.getattr.attr.mask);
  1406. #endif
  1407.  
  1408.     free_nested_getattr_data(s_op);
  1409.     return SM_ACTION_COMPLETE;
  1410. }
  1411.  
  1412. static void free_nested_getattr_data(struct PINT_server_op *s_op)
  1413. {
  1414.    int i;
  1415.     /* free up anything that was set up specifically by this nested machine */
  1416.     if (s_op->free_val)
  1417.     {
  1418.        for (i=0; i<s_op->keyval_count; i++)
  1419.        {
  1420.            if (s_op->val_a[i].buffer)
  1421.                free(s_op->val_a[i].buffer);
  1422.        }
  1423.     }
  1424.     if(s_op->val_a)
  1425.     {
  1426.         free(s_op->val_a);
  1427.         s_op->val_a = NULL;
  1428.     }
  1429.     if(s_op->key_a)
  1430.     {
  1431.         free(s_op->key_a);
  1432.         s_op->key_a = NULL;
  1433.     }
  1434.     if(s_op->u.getattr.err_array)
  1435.     {
  1436.         free(s_op->u.getattr.err_array);
  1437.         s_op->u.getattr.err_array = NULL;
  1438.     }
  1439.     if (s_op->u.getattr.mirror_dfile_status_array)
  1440.     {
  1441.         free(s_op->u.getattr.mirror_dfile_status_array);
  1442.         s_op->u.getattr.mirror_dfile_status_array = NULL;
  1443.     }
  1444.     if(s_op->free_val)
  1445.     {
  1446.         free(s_op->val.buffer);
  1447.         s_op->val.buffer = NULL;
  1448.     }
  1449.     if(s_op->error_a)
  1450.     {
  1451.         free(s_op->error_a);
  1452.         s_op->error_a = NULL;
  1453.     }
  1454.  
  1455.     return;
  1456. }
  1457.  
  1458. static PINT_sm_action getattr_cleanup(
  1459.         struct PINT_smcb *smcb, job_status_s *js_p)
  1460. {
  1461.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1462.  
  1463.     PINT_free_object_attr(&s_op->resp.u.getattr.attr);
  1464.     return(server_state_machine_complete(smcb));
  1465. }
  1466.  
  1467. static PINT_sm_action getattr_with_prelude_init(
  1468.         struct PINT_smcb *smcb, job_status_s *js_p)
  1469. {
  1470.     js_p->error_code = 0;
  1471.     return SM_ACTION_COMPLETE;
  1472. }
  1473.  
  1474. static PINT_sm_action getattr_setup_op(
  1475.         struct PINT_smcb *smcb, job_status_s *js_p)
  1476. {
  1477.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1478.     s_op->u.getattr.handle = s_op->req->u.getattr.handle;
  1479.     s_op->u.getattr.fs_id = s_op->req->u.getattr.fs_id;
  1480.     s_op->u.getattr.attrmask = s_op->req->u.getattr.attrmask;
  1481.     s_op->u.getattr.err_array = NULL;
  1482.     s_op->u.getattr.mirror_dfile_status_array = NULL;
  1483.  
  1484.     js_p->error_code = 0;
  1485.     return SM_ACTION_COMPLETE;
  1486. }
  1487.  
  1488. static PINT_sm_action getattr_datafile_handles_safety_check(
  1489.         struct PINT_smcb *smcb, job_status_s *js_p)
  1490. {
  1491.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1492.  
  1493.     if((js_p->error_code == 0) &&
  1494.         (s_op->u.getattr.attrmask & PVFS_ATTR_META_DFILES))
  1495.     {
  1496.         /* successfully read datafile key; make sure we got something valid */
  1497.         if(s_op->val.read_sz != s_op->val.buffer_sz)
  1498.         {
  1499.             gossip_err("Error: %s key found val size: %d when "
  1500.                        "expecting val size: %d\n",
  1501.                 Trove_Common_Keys[METAFILE_HANDLES_KEY].key,
  1502.                 s_op->val.read_sz,
  1503.                 s_op->val.buffer_sz);
  1504.  
  1505.             /* clear bitmask to prevent double free between setup_resp and
  1506.              * PINT_free_object_attr()
  1507.              */
  1508.             s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DFILES;
  1509.             s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_DIST;
  1510.             s_op->resp.u.getattr.attr.mask &= ~PVFS_ATTR_META_MIRROR_DFILES;
  1511.  
  1512.             js_p->error_code = -PVFS_EIO;
  1513.             return SM_ACTION_COMPLETE;
  1514.         }
  1515.     }
  1516.  
  1517.     /* otherwise deliberately preserve existing error code */
  1518.     return SM_ACTION_COMPLETE;
  1519. }
  1520.  
  1521. static PINT_sm_action getattr_get_dirdata_handle(
  1522.         struct PINT_smcb *smcb, job_status_s *js_p)
  1523. {
  1524.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1525.     int ret;
  1526.     job_id_t tmp_id;
  1527.  
  1528.     s_op->key.buffer = Trove_Common_Keys[DIR_ENT_KEY].key;
  1529.     s_op->key.buffer_sz = Trove_Common_Keys[DIR_ENT_KEY].size;
  1530.     if(s_op->free_val)
  1531.     {
  1532.         free(s_op->val.buffer);
  1533.     }
  1534.     s_op->val.buffer = &s_op->u.getattr.dirent_handle;
  1535.     s_op->val.buffer_sz = sizeof(PVFS_handle);
  1536.     s_op->free_val = 0;
  1537.  
  1538.     ret = job_trove_keyval_read(
  1539.         s_op->u.getattr.fs_id, s_op->u.getattr.handle,
  1540.         &s_op->key, &s_op->val,
  1541.         0,
  1542.         NULL,
  1543.         smcb,
  1544.         0,
  1545.         js_p,
  1546.         &tmp_id,
  1547.         server_job_context, s_op->req->hints);
  1548.  
  1549.     return ret;
  1550. }
  1551.         
  1552. static PINT_sm_action getattr_get_dirent_count(
  1553.         struct PINT_smcb *smcb, job_status_s *js_p)
  1554. {
  1555.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1556.     int ret;
  1557.     job_id_t tmp_id;
  1558.  
  1559.     if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_DIRENT_COUNT))
  1560.     {
  1561.          /* the caller didn't really want the dirent count; skip to get
  1562.           * directory hints
  1563.           */
  1564.          js_p->error_code = STATE_DIR_HINT;
  1565.          return SM_ACTION_COMPLETE;
  1566.     }
  1567.     ret = job_trove_keyval_get_handle_info(
  1568.         s_op->u.getattr.fs_id,
  1569.         s_op->u.getattr.dirent_handle,
  1570.         TROVE_KEYVAL_HANDLE_COUNT |
  1571.         0,
  1572.         &s_op->u.getattr.keyval_handle_info,
  1573.         smcb,
  1574.         0,
  1575.         js_p,
  1576.         &tmp_id,
  1577.         server_job_context, s_op->req->hints);
  1578.  
  1579.     return ret;
  1580. }
  1581.  
  1582. static PINT_sm_action getattr_interpret_dirent_count(
  1583.         struct PINT_smcb *smcb, job_status_s *js_p)
  1584. {
  1585.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1586.     switch(js_p->error_code)
  1587.     {
  1588.         case -TROVE_ENOENT:
  1589.             js_p->error_code = 0;
  1590.             s_op->resp.u.getattr.attr.u.dir.dirent_count = 0;
  1591.             break;
  1592.         case 0:
  1593.             s_op->resp.u.getattr.attr.u.dir.dirent_count =
  1594.                 s_op->u.getattr.keyval_handle_info.count;
  1595.             break;
  1596.         default:
  1597.             return SM_ACTION_COMPLETE;
  1598.     }
  1599.  
  1600.     gossip_debug(GOSSIP_GETATTR_DEBUG, "getattr: dirent_count: %lld\n",
  1601.         lld(s_op->resp.u.getattr.attr.u.dir.dirent_count));
  1602.  
  1603.     return SM_ACTION_COMPLETE;
  1604. }
  1605.  
  1606. static PINT_sm_action getattr_get_dir_hint(
  1607.         struct PINT_smcb *smcb, job_status_s *js_p)
  1608. {
  1609.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1610.     int ret, i;
  1611.     job_id_t tmp_id;
  1612.  
  1613.     /* NOTE: memory allocations are released in the getattr_cleanup()
  1614.      * function 
  1615.      */
  1616.    
  1617.     if (!(s_op->u.getattr.attrmask & PVFS_ATTR_DIR_HINT))
  1618.     {
  1619.         /* the caller didn't really want the dir hints; skip
  1620.          */
  1621.         js_p->error_code = STATE_DONE;
  1622.         return SM_ACTION_COMPLETE;
  1623.     }
  1624.  
  1625.  
  1626.     gossip_debug(GOSSIP_SERVER_DEBUG, "  trying to getxattr of %s,%s,%s "
  1627.                  "of dir handle (coll_id = %d, handle = %llu\n",
  1628.                  Trove_Special_Keys[DIST_NAME_KEY].key,
  1629.                  Trove_Special_Keys[DIST_PARAMS_KEY].key, 
  1630.                  Trove_Special_Keys[NUM_DFILES_KEY].key,
  1631.                  s_op->u.getattr.fs_id, llu(s_op->u.getattr.handle));
  1632.  
  1633.     s_op->resp.u.getattr.attr.u.dir.hint.dist_params = 
  1634.         (char *) calloc(1, PVFS_REQ_LIMIT_DIST_BYTES);
  1635.     if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_params)
  1636.     {
  1637.         js_p->error_code = -PVFS_ENOMEM;
  1638.         return SM_ACTION_COMPLETE;
  1639.     }
  1640.     s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len = 
  1641.         PVFS_REQ_LIMIT_DIST_BYTES;
  1642.  
  1643.     s_op->resp.u.getattr.attr.u.dir.hint.dist_name = 
  1644.         (char *) calloc(1, PVFS_REQ_LIMIT_DIST_NAME);
  1645.     if (!s_op->resp.u.getattr.attr.u.dir.hint.dist_name)
  1646.     {
  1647.         js_p->error_code = -PVFS_ENOMEM;
  1648.         return SM_ACTION_COMPLETE;
  1649.     }
  1650.     s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len   = 
  1651.         PVFS_REQ_LIMIT_DIST_NAME;
  1652.  
  1653.     s_op->key_a = 
  1654.         (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS, sizeof(PVFS_ds_keyval));
  1655.     if (s_op->key_a == NULL)
  1656.     {
  1657.         js_p->error_code = -PVFS_ENOMEM;
  1658.         return SM_ACTION_COMPLETE;
  1659.     }
  1660.     s_op->val_a = (PVFS_ds_keyval *) calloc(NUM_SPECIAL_KEYS
  1661.                                            ,sizeof(PVFS_ds_keyval));
  1662.     if (s_op->val_a == NULL)
  1663.     {
  1664.         js_p->error_code = -PVFS_ENOMEM;
  1665.         return SM_ACTION_COMPLETE;
  1666.     }
  1667.     s_op->u.getattr.err_array = (PVFS_error*)calloc(NUM_SPECIAL_KEYS,
  1668.         sizeof(PVFS_error));
  1669.     if(s_op->u.getattr.err_array == NULL)
  1670.     {
  1671.         js_p->error_code = -PVFS_ENOMEM;
  1672.         return SM_ACTION_COMPLETE;
  1673.  
  1674.     }
  1675.  
  1676.     s_op->free_val = 0;
  1677.     s_op->keyval_count = NUM_SPECIAL_KEYS;
  1678.     for (i = 0; i < NUM_SPECIAL_KEYS; i++)
  1679.     {
  1680.         s_op->key_a[i].buffer = Trove_Special_Keys[i].key;
  1681.         s_op->key_a[i].buffer_sz = Trove_Special_Keys[i].size;
  1682.         if (i == NUM_DFILES_KEY)
  1683.         {
  1684.             s_op->val_a[i].buffer = (char *) calloc(1, 16);
  1685.             if(s_op->val_a[i].buffer == NULL)
  1686.             {
  1687.                 js_p->error_code = -PVFS_ENOMEM;
  1688.                 return SM_ACTION_COMPLETE;
  1689.             }
  1690.             s_op->val_a[i].buffer_sz = 16;
  1691.         }
  1692.         else if (i == DIST_PARAMS_KEY) {
  1693.             s_op->val_a[i].buffer 
  1694.             = s_op->resp.u.getattr.attr.u.dir.hint.dist_params;
  1695.             s_op->val_a[i].buffer_sz 
  1696.             = s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len;
  1697.         }
  1698.         else if (i == DIST_NAME_KEY) {
  1699.             s_op->val_a[i].buffer 
  1700.             = s_op->resp.u.getattr.attr.u.dir.hint.dist_name;
  1701.             s_op->val_a[i].buffer_sz 
  1702.             = s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len;
  1703.         }
  1704.     }
  1705.  
  1706.     js_p->error_code = 0;
  1707.     ret = job_trove_keyval_read_list(
  1708.         s_op->u.getattr.fs_id, 
  1709.         s_op->u.getattr.handle,
  1710.         s_op->key_a, s_op->val_a, s_op->u.getattr.err_array, NUM_SPECIAL_KEYS,
  1711.         0, NULL, smcb, 0, js_p, &tmp_id,
  1712.         server_job_context, s_op->req->hints);
  1713.  
  1714.     return ret;
  1715. }
  1716.  
  1717. static PINT_sm_action getattr_interpret_dir_hint(
  1718.         struct PINT_smcb *smcb, job_status_s *js_p)
  1719. {
  1720.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1721.     if(js_p->error_code != 0 && js_p->error_code != -TROVE_ENOENT)
  1722.     {
  1723.         /* if we failed to get any of the keys, and the error code is due to
  1724.          * something other than the keys simply not being present, then
  1725.          * propigate the error.
  1726.          */
  1727.         return SM_ACTION_COMPLETE;
  1728.     }
  1729.  
  1730.     gossip_debug(GOSSIP_SERVER_DEBUG, 
  1731.         "getattr: job status code = %d\n", js_p->error_code);
  1732.     if (s_op->val_a && s_op->key_a)
  1733.     {
  1734.         long int dfile_count = 0;
  1735.  
  1736.         if (s_op->u.getattr.err_array[DIST_NAME_KEY] == 0)
  1737.         {
  1738.             gossip_debug(GOSSIP_SERVER_DEBUG, 
  1739.                 "val_a[DIST_NAME_KEY] %p read_sz = %d dist_name = %s\n", 
  1740.                 s_op->val_a[DIST_NAME_KEY].buffer, 
  1741.                 s_op->val_a[DIST_NAME_KEY].read_sz,
  1742.                 (char *)s_op->val_a[DIST_NAME_KEY].buffer);
  1743.             s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len = 
  1744.                 s_op->val_a[DIST_NAME_KEY].read_sz;
  1745.         }
  1746.         else
  1747.         {
  1748.             s_op->resp.u.getattr.attr.u.dir.hint.dist_name_len = 0;
  1749.         }            
  1750.         s_op->val_a[DIST_NAME_KEY].buffer = NULL;
  1751.  
  1752.         if (s_op->u.getattr.err_array[DIST_PARAMS_KEY] == 0)
  1753.         {
  1754.             gossip_debug(GOSSIP_SERVER_DEBUG, 
  1755.                 "val_a[DIST_PARAMS_KEY] %p read_sz = %d dist_params = %s\n", 
  1756.                 s_op->val_a[DIST_PARAMS_KEY].buffer, 
  1757.                 s_op->val_a[DIST_PARAMS_KEY].read_sz,
  1758.                 (char *)s_op->val_a[DIST_PARAMS_KEY].buffer);
  1759.             s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len = 
  1760.                 s_op->val_a[DIST_PARAMS_KEY].read_sz;
  1761.         }
  1762.         else
  1763.         {
  1764.             s_op->resp.u.getattr.attr.u.dir.hint.dist_params_len = 0;
  1765.         }
  1766.         if (s_op->val_a[DIST_PARAMS_KEY].buffer)
  1767.         {
  1768.            s_op->val_a[DIST_PARAMS_KEY].buffer = NULL;
  1769.         }
  1770.  
  1771.  
  1772.  
  1773.         if (s_op->u.getattr.err_array[NUM_DFILES_KEY] == 0)
  1774.         {
  1775.             char *endptr = NULL;
  1776.             gossip_debug(GOSSIP_SERVER_DEBUG, "val_a[NUM_DFILES_KEY] %p "
  1777.                                               "read_sz = %d\n", 
  1778.                 s_op->val_a[NUM_DFILES_KEY].buffer, 
  1779.                 s_op->val_a[NUM_DFILES_KEY].read_sz);
  1780.             dfile_count = strtol(s_op->val_a[NUM_DFILES_KEY].buffer
  1781.                                 , &endptr, 10);
  1782.             if (*endptr != '\0' || dfile_count < 0)
  1783.             {
  1784.                 dfile_count = 0;
  1785.             }
  1786.         }
  1787.         if(s_op->val_a[NUM_DFILES_KEY].buffer)
  1788.         {
  1789.             free(s_op->val_a[NUM_DFILES_KEY].buffer);
  1790.             s_op->val_a[NUM_DFILES_KEY].buffer = NULL;
  1791.             s_op->val_a[NUM_DFILES_KEY].buffer_sz = 0;
  1792.         }
  1793.         s_op->keyval_count = 0;
  1794.         s_op->free_val = 0;
  1795.  
  1796.  
  1797.         s_op->resp.u.getattr.attr.u.dir.hint.dfile_count = dfile_count;
  1798.  
  1799.         gossip_debug(GOSSIP_SERVER_DEBUG, "getattr: dir hint dfile_count: %d\n",
  1800.             s_op->resp.u.getattr.attr.u.dir.hint.dfile_count);
  1801.  
  1802.         js_p->error_code = 0;
  1803.     }/* end if val_a and key_a */
  1804.     return SM_ACTION_COMPLETE;
  1805. }
  1806.  
  1807. /* getattr_detect_stuffed()
  1808.  *
  1809.  * determine if a file is stuffed or not
  1810.  */
  1811. static PINT_sm_action getattr_detect_stuffed(
  1812.         struct PINT_smcb *smcb, job_status_s *js_p)
  1813. {
  1814.     struct PINT_server_op *s_op = PINT_sm_frame(smcb, PINT_FRAME_CURRENT);
  1815.     job_id_t tmp_id;
  1816.  
  1817.     /* we can determine stuffedness by the presence of the dfiles req key */
  1818.  
  1819.     s_op->key.buffer = Trove_Common_Keys[NUM_DFILES_REQ_KEY].key;
  1820.     s_op->key.buffer_sz = Trove_Common_Keys[NUM_DFILES_REQ_KEY].size;
  1821.     if(s_op->free_val)
  1822.     {
  1823.         free(s_op->val.buffer);
  1824.     }
  1825.     s_op->val.buffer =  &s_op->u.getattr.num_dfiles_req;
  1826.     s_op->val.buffer_sz =  sizeof(s_op->u.getattr.num_dfiles_req);
  1827.     s_op->free_val = 0;
  1828.  
  1829.     return(job_trove_keyval_read(
  1830.         s_op->u.getattr.fs_id, 
  1831.         s_op->u.getattr.handle,
  1832.         &(s_op->key), 
  1833.         &(s_op->val), 
  1834.         0, 
  1835.         NULL, smcb, 0, js_p,
  1836.         &tmp_id, server_job_context,
  1837.         s_op->req->hints));
  1838. }
  1839.  
  1840. PINT_GET_OBJECT_REF_DEFINE(getattr);
  1841.  
  1842. struct PINT_server_req_params pvfs2_get_attr_params =
  1843. {
  1844.     .string_name = "getattr",
  1845.     .perm = PINT_SERVER_CHECK_ATTR,
  1846.     .access_type = PINT_server_req_readonly,
  1847.     .sched_policy = PINT_SERVER_REQ_SCHEDULE,
  1848.     .get_object_ref = PINT_get_object_ref_getattr,
  1849.     .state_machine = &pvfs2_get_attr_sm
  1850. };
  1851.  
  1852. /*
  1853.  * Local variables:
  1854.  *  mode: c
  1855.  *  c-indent-level: 4
  1856.  *  c-basic-offset: 4
  1857.  * End:
  1858.  *
  1859.  * vim: ft=c ts=8 sts=4 sw=4 expandtab
  1860.  */
  1861.  
  1862.